From 2da128a1f516eb23e28c6b162a1f8023e7ac3533 Mon Sep 17 00:00:00 2001 From: SONIABHISHEK121 Date: Sun, 14 Jul 2024 12:25:57 +0530 Subject: [PATCH 1/7] test_pyemail.py Signed-off-by: SONIABHISHEK121 --- tests/test_pyemail.py | 143 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 128 insertions(+), 15 deletions(-) diff --git a/tests/test_pyemail.py b/tests/test_pyemail.py index 4fd23a5fb..6dbd5a049 100644 --- a/tests/test_pyemail.py +++ b/tests/test_pyemail.py @@ -24,6 +24,7 @@ from email.mime.multipart import MIMEMultipart import sys, os import smtplib +import base64 from configparser import SafeConfigParser pytestmark = [pytest.mark.dontusefix] @@ -33,44 +34,48 @@ config.read("rece.ini") conf_mail = dict(config.items("MAIL")) + def test_Connectar_Enviar(mocker): """Test de conexion""" mocker.patch("smtplib.SMTP") pyemail.Conectar( - conf_mail["servidor"], - conf_mail["usuario"], - conf_mail["clave"], - 25, - ) - pyemail.Enviar( - conf_mail["remitente"], "prueba", "check@gmail.com", None + conf_mail["servidor"], + conf_mail["usuario"], + conf_mail["clave"], + 25, ) + pyemail.Enviar(conf_mail["remitente"], "prueba", "check@gmail.com", None) - pyemail.Salir() + pyemail.Salir() smtplib.SMTP.assert_called_with(conf_mail["servidor"], 25) + def test_Crear(): - ok =pyemail.Crear() + ok = pyemail.Crear() assert ok def test_Agreagar_Destinatario(): - ok =pyemail.AgregarDestinatario("test@gmail.com") + ok = pyemail.AgregarDestinatario("test@gmail.com") assert ok + def test_AgregarCC(): - ok =pyemail.AgregarCC("test@gmail.com") + ok = pyemail.AgregarCC("test@gmail.com") assert ok + def test_AgregarBCC(): - ok =pyemail.AgregarBCC("test@gmail.com") + ok = pyemail.AgregarBCC("test@gmail.com") assert ok + def test_Adjuntar(): - ok =pyemail.Adjuntar("test@gmail.com") + ok = pyemail.Adjuntar("test@gmail.com") assert ok + def test_main(mocker): """Test de funcion main""" mocker.patch("smtplib.SMTP") @@ -83,8 +88,8 @@ def test_main(mocker): smtplib.SMTP.assert_called_with(conf_mail["servidor"], 25) -def test_main_prueba(mocker): +def test_main_prueba(mocker): mocker.patch("smtplib.SMTP") sys.argv = [] sys.argv.append("/prueba") @@ -92,4 +97,112 @@ def test_main_prueba(mocker): sys.argv.append("pass123") main() - smtplib.SMTP.assert_called_with('smtp.gmail.com', 587) \ No newline at end of file + smtplib.SMTP.assert_called_with("smtp.gmail.com", 587) + + +def test_Conectar_SSL(mocker): + mocker.patch("smtplib.SMTP_SSL") + pyemail.Conectar( + conf_mail["servidor"], conf_mail["usuario"], conf_mail["clave"], 465 + ) + smtplib.SMTP_SSL.assert_called_with(conf_mail["servidor"], 465) + + +def test_Conectar_TLS(mocker): + mock_smtp = mocker.patch("smtplib.SMTP") + pyemail.Conectar( + conf_mail["servidor"], conf_mail["usuario"], conf_mail["clave"], 587 + ) + mock_smtp.return_value.starttls.assert_called_once() + + +def test_Conectar_Exception(mocker): + mocker.patch("smtplib.SMTP", side_effect=Exception("Connection error")) + result = pyemail.Conectar( + conf_mail["servidor"], conf_mail["usuario"], conf_mail["clave"], 25 + ) + assert result is False + assert pyemail.Excepcion != "" + assert pyemail.Traceback != "" + + +def test_Enviar_CC_BCC(mocker): + mocker.patch("smtplib.SMTP") + pyemail.Conectar( + conf_mail["servidor"], conf_mail["usuario"], conf_mail["clave"], 25 + ) + pyemail.CC = [] # Clear CC list + pyemail.BCC = [] # Clear BCC list + pyemail.AgregarCC("cc@example.com") + pyemail.AgregarBCC("bcc@example.com") + pyemail.Enviar(conf_mail["remitente"], "prueba", "check@gmail.com", "Test message") + assert len(pyemail.CC) == 1 + assert len(pyemail.BCC) == 1 + + +def test_Salir_Exception(mocker): + mock_smtp = mocker.patch("smtplib.SMTP") + mock_smtp.return_value.quit.side_effect = Exception("Quit error") + pyemail.Conectar( + conf_mail["servidor"], conf_mail["usuario"], conf_mail["clave"], 25 + ) + result = pyemail.Salir() + assert result is False + assert pyemail.Excepcion != "" + assert pyemail.Traceback != "" + + +def test_Enviar_HTML(mocker): + mock_smtp = mocker.patch("smtplib.SMTP") + pyemail.Conectar( + conf_mail["servidor"], conf_mail["usuario"], conf_mail["clave"], 25 + ) + pyemail.MensajeHTML = "Test HTML" + pyemail.MensajeTexto = "Test plain text" + pyemail.Crear(conf_mail["remitente"], "prueba") + pyemail.AgregarDestinatario("check@gmail.com") + pyemail.Enviar() + + assert mock_smtp.return_value.sendmail.called + call_args = mock_smtp.return_value.sendmail.call_args[0][2] + + assert "Content-Type: multipart/related" in call_args + assert "Content-Type: multipart/alternative" in call_args + assert "Content-Type: text/text" in call_args + assert "Content-Type: text/html" in call_args + assert "Test plain text" in call_args + assert "Test HTML" in call_args + + # Reset for next tests + pyemail.MensajeHTML = None + pyemail.MensajeTexto = None + + +def test_Enviar_Con_Adjunto(mocker): + mock_smtp = mocker.patch("smtplib.SMTP") + pyemail.Conectar( + conf_mail["servidor"], conf_mail["usuario"], conf_mail["clave"], 25 + ) + pyemail.Crear(conf_mail["remitente"], "prueba") + pyemail.AgregarDestinatario("check@gmail.com") + + attachment_content = "This is a test attachment" + with open("test_attachment.txt", "w") as f: + f.write(attachment_content) + + pyemail.Adjuntar("test_attachment.txt") + pyemail.MensajeTexto = "Test message" + pyemail.Enviar() + + assert mock_smtp.return_value.sendmail.called + call_args = mock_smtp.return_value.sendmail.call_args[0][2] + + assert ( + 'Content-Disposition: attachment; filename="test_attachment.txt"' in call_args + ) + encoded_content = base64.b64encode(attachment_content.encode()).decode() + assert encoded_content in call_args + + os.remove("test_attachment.txt") + pyemail.adjuntos = [] + pyemail.MensajeTexto = None From 3748f93c322ec167850ba6d350f4f514aa35c39c Mon Sep 17 00:00:00 2001 From: SONIABHISHEK121 Date: Sun, 14 Jul 2024 12:35:14 +0530 Subject: [PATCH 2/7] test_pyi25.py Signed-off-by: SONIABHISHEK121 --- tests/test_pyi25.py | 61 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/tests/test_pyi25.py b/tests/test_pyi25.py index 46d084406..ff5844022 100644 --- a/tests/test_pyi25.py +++ b/tests/test_pyi25.py @@ -33,6 +33,7 @@ pyi25 = PyI25() + def test_GenerarImagen(): "Prueba de generación de imagen" barras = "2026756539302400161203034739042201105299" @@ -40,7 +41,7 @@ def test_GenerarImagen(): pyi25.GenerarImagen(barras, archivo) assert os.path.exists(archivo) - #compare the image with a reference one + # compare the image with a reference one ref = Image.open("tests/images/prueba-cae-i25.png") test = Image.open(archivo) diff = ImageChops.difference(ref, test) @@ -67,21 +68,77 @@ def test_DigitoVerificadorModulo10(): barras = barras + pyi25.DigitoVerificadorModulo10(barras) assert barras == "2026756539302400161203034739042201105299" + def test_main(): sys.argv = [] main() + def test_main_archivo(): sys.argv = [] sys.argv.append("--archivo") sys.argv.append("test123.png") main() + def test_main_mostrar(mocker): mocker.patch("os.system") sys.argv = [] sys.argv.append("--mostrar") archivo = "prueba-cae-i25.png" main() - if(sys.platform == 'linux2' or sys.platform == 'linux'): + if sys.platform == "linux2" or sys.platform == "linux": os.system.assert_called_with("eog " "%s" "" % archivo) + + +def test_DigitoVerificadorModulo10_edge_cases(): + assert pyi25.DigitoVerificadorModulo10("") == "" + assert pyi25.DigitoVerificadorModulo10("123") == "6" + assert pyi25.DigitoVerificadorModulo10("9999999999") == "0" + + +def test_main_custom_archivo(): + sys.argv = ["pyi25.py", "--archivo", "custom_test.jpg"] + main() + assert os.path.exists("custom_test.jpg") + os.remove("custom_test.jpg") + + +def test_GenerarImagen_odd_length_code(): + barras = "12345" + archivo = "odd_test.png" + pyi25.GenerarImagen(barras, archivo) + assert os.path.exists(archivo) + with Image.open(archivo) as img: + assert img.size[0] > len(barras) * 3 # Check if 0 was prepended + os.remove(archivo) + + +def test_DigitoVerificadorModulo10_non_digit(): + assert pyi25.DigitoVerificadorModulo10("12A34") == "" + + +def test_DigitoVerificadorModulo10_large_number(): + large_number = "9" * 1000 + result = pyi25.DigitoVerificadorModulo10(large_number) + assert len(result) == 1 and result.isdigit() + + +def test_GenerarImagen_with_custom_params(): + barras = "1234567890" + archivo = "custom_params.png" + pyi25.GenerarImagen( + barras, archivo, basewidth=5, width=300, height=50, extension="PNG" + ) + assert os.path.exists(archivo) + with Image.open(archivo) as img: + assert img.size == (300, 50) + + +def test_main_with_noverificador(): + sys.argv = ["pyi25.py", "--barras", "1234567890", "--noverificador"] + main() + assert os.path.exists("prueba-cae-i25.png") + with open("prueba-cae-i25.png", "rb") as f: + content = f.read() + assert len(content) > 0 From 363fb2a4bcb2e70adc79f3c7244e99f40e0955d2 Mon Sep 17 00:00:00 2001 From: SONIABHISHEK121 Date: Sun, 14 Jul 2024 12:44:17 +0530 Subject: [PATCH 3/7] test_ws_sr_padron.py Signed-off-by: SONIABHISHEK121 --- tests/test_ws_sr_padron.py | 81 +++++++++++++++++++++++++++++++++----- 1 file changed, 71 insertions(+), 10 deletions(-) diff --git a/tests/test_ws_sr_padron.py b/tests/test_ws_sr_padron.py index 7da0d0002..2370ba506 100644 --- a/tests/test_ws_sr_padron.py +++ b/tests/test_ws_sr_padron.py @@ -19,7 +19,8 @@ __copyright__ = "Copyright (C) 2010-2019 Mariano Reingart" __license__ = "GPL 3.0" -import os, sys +import os +import sys import pytest from pyafipws.wsaa import WSAA @@ -35,16 +36,14 @@ PKEY = "reingart.key" CACHE = "" -pytestmark =pytest.mark.vcr - - +pytestmark = pytest.mark.vcr @pytest.mark.xfail def test_server_status(auth): """Test de estado de servidores.""" # Estados de servidores respuesta no funciona afip - wspa5=auth + wspa5 = auth wspa5.Dummy() assert wspa5.AppServerStatus == "OK" assert wspa5.DbServerStatus == "OK" @@ -53,17 +52,16 @@ def test_server_status(auth): def test_inicializar(auth): """Test inicializar variables de BaseWS.""" - wspa5=auth + wspa5 = auth wspa5.inicializar() assert wspa5.tipo_doc == 0 assert wspa5.denominacion == "" assert wspa5.actividades == [] - def test_consultar(auth): """Test consultar.""" - wspa5=auth + wspa5 = auth # Consulta Nivel A4 afip no esta funcionando. id_persona = "20201797064" consulta = wspa5.Consultar(id_persona) @@ -72,7 +70,7 @@ def test_consultar(auth): def test_consultar_a5(auth): """Test consultar padron nivel A5.""" - wspa5=auth + wspa5 = auth id_persona = "20201797064" consulta = wspa5.Consultar(id_persona) @@ -89,7 +87,7 @@ def test_main(auth): sys.argv = [] sys.argv.append("--debug") sys.argv.append("--constancia") - sys.argv.append('--prueba') + sys.argv.append("--prueba") padron = main() assert padron.denominacion == "ERNESTO DANIEL, MARCELO NICOLAS" assert padron.tipo_doc == 80 @@ -108,3 +106,66 @@ def test_main_csv(auth): sys.argv.append("--csv") main() assert os.path.isfile("salida.csv") + + +@pytest.fixture +def mock_client(mocker): + client = mocker.Mock() + client.getPersona.return_value = { + "personaReturn": { + "persona": { + "idPersona": "20000000001", + "tipoPersona": "FISICA", + "tipoClave": "CUIT", + "numeroDocumento": "00000001", + "estadoClave": "ACTIVO", + "nombre": "John", + "apellido": "Doe", + "domicilio": [ + { + "tipoDomicilio": "FISCAL", + "direccion": "Test Street 123", + "localidad": "Test City", + "idProvincia": "01", + "codPostal": "12345", + } + ], + "impuesto": [{"idImpuesto": "30", "estado": "ACTIVO"}], + "actividad": [{"idActividad": "123456"}], + "categoria": [{"idImpuesto": "20", "estado": "ACTIVO"}], + } + } + } + return client + + +def test_consultar(mock_client): + ws = WSSrPadronA4() + ws.client = mock_client + ws.Sign = "test_sign" + ws.Token = "test_token" + ws.Cuit = "test_cuit" + + result = ws.Consultar("20000000001") + + assert result == True + assert ws.cuit == "20000000001" + assert ws.tipo_persona == "FISICA" + assert ws.tipo_doc == 80 + assert ws.nro_doc == "00000001" + assert ws.estado == "ACTIVO" + assert ws.denominacion == "Doe, John" + assert ws.direccion == "Test Street 123" + assert ws.localidad == "Test City" + assert ws.provincia == "" + assert ws.cod_postal == "12345" + assert ws.domicilio == "Test Street 123 - Test City (12345) - " + assert ws.impuestos == ["30"] + assert ws.actividades == ["123456"] + + mock_client.getPersona.assert_called_once_with( + sign="test_sign", + token="test_token", + cuitRepresentada="test_cuit", + idPersona="20000000001", + ) From 92f3c7bc244e13f92415069716ce4f17cd78574f Mon Sep 17 00:00:00 2001 From: SONIABHISHEK121 Date: Sun, 14 Jul 2024 16:47:24 +0530 Subject: [PATCH 4/7] test_wsaa.py Signed-off-by: SONIABHISHEK121 --- tests/test_wsaa.py | 289 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 258 insertions(+), 31 deletions(-) diff --git a/tests/test_wsaa.py b/tests/test_wsaa.py index 6fcafe6c2..69bb4c1b8 100644 --- a/tests/test_wsaa.py +++ b/tests/test_wsaa.py @@ -17,16 +17,17 @@ __license__ = "GPL 3.0" import pytest +import io import os import sys import base64 -from pyafipws.wsaa import WSAA, call_wsaa, sign_tra_openssl +import time +import datetime +from pyafipws.wsaa import WSAA, call_wsaa, sign_tra_openssl, DEFAULT_TTL from pyafipws.wsaa import main from past.builtins import basestring from builtins import str -from pyafipws.utils import * -from pysimplesoap import * -DEFAULT_TTL = 60 * 60 * 5 # five hours +from pyafipws.utils import SimpleXMLElement, date WSDL = "https://wsaahomo.afip.gov.ar/ws/services/LoginCms" CACERT = "conf/afip_ca_info.crt" @@ -34,14 +35,63 @@ pytestmark = [pytest.mark.dontusefix] -#fixture for key and certificate @pytest.fixture def key_and_cert(): - KEY = 'reingart.key' - CERT = 'reingart.crt' + """Fixture for key and certificate""" + KEY = "reingart.key" + CERT = "reingart.crt" return [KEY, CERT] +@pytest.fixture +def wsaa_instance(): + """Fixture que devuelve una nueva instancia de WSAA para cada prueba.""" + return WSAA() + + +@pytest.fixture +def mock_dependencies(monkeypatch): + """ + Fixture que simula varias dependencias y métodos de WSAA para las pruebas. + Configura un entorno controlado para las operaciones de WSAA. + """ + monkeypatch.setattr("pyafipws.wsaa.DEBUG", True) + monkeypatch.setattr("os.path.exists", lambda x: False) + monkeypatch.setattr("os.path.getsize", lambda x: 0) + monkeypatch.setattr("os.path.getmtime", lambda x: 0) + monkeypatch.setattr("time.time", lambda: DEFAULT_TTL + 1) + monkeypatch.setattr(WSAA, "CreateTRA", lambda self, service, ttl: "mock_tra") + monkeypatch.setattr(WSAA, "SignTRA", lambda self, tra, crt, key: "mock_cms") + monkeypatch.setattr(WSAA, "Conectar", lambda self, *args, **kwargs: True) + monkeypatch.setattr(WSAA, "LoginCMS", lambda self, cms: "mock_ta") + monkeypatch.setattr(WSAA, "AnalizarXml", lambda self, xml: None) + monkeypatch.setattr( + WSAA, + "ObtenerTagXml", + lambda self, tag: "mock_token" if tag == "token" else "mock_sign", + ) + + +@pytest.fixture +def mock_imports(monkeypatch): + """ + Fixture para simular el fallo de importación de cryptography y la + información de excepción. + """ + + def mock_exception_info(): + return { + "msg": "ModuleNotFoundError: import of cryptography halted;" + " None in sys.modules" + } + + monkeypatch.setitem(sys.modules, "cryptography", None) + monkeypatch.setattr("pyafipws.wsaa.exception_info", mock_exception_info) + yield + if "cryptography" in sys.modules: + del sys.modules["cryptography"] + + def test_analizar_certificado(key_and_cert): """Test analizar datos en certificado.""" wsaa = WSAA() @@ -55,7 +105,7 @@ def test_crear_clave_privada(): """Test crear clave RSA.""" wsaa = WSAA() chk = wsaa.CrearClavePrivada() - assert chk == True + assert chk is True def test_crear_pedido_certificado(): @@ -63,8 +113,8 @@ def test_crear_pedido_certificado(): wsaa = WSAA() chk1 = wsaa.CrearClavePrivada() chk2 = wsaa.CrearPedidoCertificado() - assert chk1 == True - assert chk2 == True + assert chk1 is True + assert chk2 is True def test_expirado(): @@ -72,18 +122,18 @@ def test_expirado(): wsaa = WSAA() # checking for expired certificate script_dir = os.path.dirname(__file__) - file_path = os.path.join(script_dir, 'xml/expired_ta.xml') + file_path = os.path.join(script_dir, "xml/expired_ta.xml") chk = wsaa.AnalizarXml(xml=open(file_path, "r").read()) chk2 = wsaa.Expirado() - # checking for a valid certificate,i.e. which will + # checking for a valid certificate,i.e. which will # have expiration time 12 hrs(43200 secs) from generation fec = str(date("c", date("U") + 43200)) chk3 = wsaa.Expirado(fecha=fec) - assert chk == True - assert chk2 == True - assert chk3 == False + assert chk is True + assert chk2 is True + assert not chk3 @pytest.mark.vcr @@ -99,14 +149,16 @@ def test_login_cms(key_and_cert): ta = SimpleXMLElement(ta_xml) if not isinstance(cms, str): - cms = cms.decode('utf-8') + cms = cms.decode("utf-8") assert isinstance(cms, str) - - assert cms.startswith('MII') - assert chk == True - assert ta_xml.startswith('') + assert cms.startswith("MII") + + assert chk is True + assert ta_xml.startswith( + '' + ) assert ta.credentials.token assert ta.credentials.sign @@ -128,7 +180,8 @@ def test_wsaa_create_tra(): # sanity checks: assert isinstance(tra, basestring) assert tra.startswith( - '' '' + '' + '' ) assert "" in tra assert "" in tra @@ -138,7 +191,10 @@ def test_wsaa_create_tra(): def test_wsaa_sign(): wsaa = WSAA() - tra = '' + tra = ( + '' + '' + ) # TODO: use certificate and private key as fixture / PEM text (not files) cms = wsaa.SignTRA(tra, "reingart.crt", "reingart.key") # TODO: return string @@ -157,25 +213,49 @@ def test_wsaa_sign_tra(key_and_cert): sign = wsaa.SignTRA(tra, key_and_cert[1], key_and_cert[0]) if not isinstance(sign, str): - sign = sign.decode('utf-8') + sign = sign.decode("utf-8") assert isinstance(sign, str) assert sign.startswith("MII") -def test_wsaa_sign_openssl(key_and_cert): - wsaa = WSAA() +def test_wsaa_sign_openssl(key_and_cert, monkeypatch): + """ + Prueba de la función sign_tra_openssl: + 1. Verificar la firma exitosa + 2. Comprobar el comportamiento cuando OpenSSL no está en la RUTA (PATH) + 3. Verificar el manejo de otros errores del sistema operativo (OSErrors) + """ + + wsaa = WSAA() tra = wsaa.CreateTRA("wsfe").encode() + + # Test successful signing sign = sign_tra_openssl(tra, key_and_cert[1], key_and_cert[0]) # check if the commanmd line input is a byte data assert isinstance(sign, bytes) + assert sign.decode("utf8").startswith("MII") - if isinstance(sign, bytes): - sign = sign.decode("utf8") - - assert sign.startswith("MII") + # Test OpenSSL not in PATH + def mock_popen_not_found(*args, **kwargs): + raise OSError(2, "File not found") + + monkeypatch.setattr("pyafipws.wsaa.Popen", mock_popen_not_found) + with pytest.warns( + UserWarning, + match="El ejecutable de OpenSSL no esta disponible en el PATH" + ): + with pytest.raises(OSError): + sign_tra_openssl(tra, key_and_cert[1], key_and_cert[0]) + # Test other OSError + def mock_popen_other_error(*args, **kwargs): + raise OSError(1, "Operation not permitted") + + monkeypatch.setattr("pyafipws.wsaa.Popen", mock_popen_other_error) + with pytest.raises(OSError): + sign_tra_openssl(tra, key_and_cert[1], key_and_cert[0]) def test_wsaa_sign_tra_inline(key_and_cert): @@ -189,10 +269,10 @@ def test_wsaa_sign_tra_inline(key_and_cert): ) if not isinstance(sign, str): - sign = sign.decode('utf-8') + sign = sign.decode("utf-8") if not isinstance(sign_2, str): - sign_2 = sign_2.decode('utf-8') + sign_2 = sign_2.decode("utf-8") assert isinstance(sign, str) assert sign.startswith("MII") @@ -240,3 +320,150 @@ def test_call_wsaa(key_and_cert): tra = wsaa.CreateTRA(service="wsfe", ttl=DEFAULT_TTL) cms = wsaa.SignTRA(tra, key_and_cert[1], key_and_cert[0]) assert call_wsaa(cms, WSDL) + + +def test_import_error_handling(mock_imports): + """ + Caso de prueba para verificar el manejo de errores cuando falla la importación del módulo cryptography. + Verifica advertencias correctas, presencia de atributos y contenido del mensaje de error. + """ + + with pytest.warns(UserWarning) as w: + import importlib + + importlib.reload(pytest.importorskip("pyafipws.wsaa")) + + wsaa = pytest.importorskip("pyafipws.wsaa") + assert wsaa.Binding is None + assert all(hasattr(wsaa, attr) for attr in ["Popen", "PIPE", "b64encode"]) + assert len(w) == 2 + assert str(w[0].message) == "No es posible importar cryptography (OpenSSL)" + assert ( + str(w[1].message).strip() == + "ModuleNotFoundError: import of cryptography halted; None in sys.modules" + ) + + +def test_autenticar_full_flow(wsaa_instance, mock_dependencies, tmp_path, capsys, monkeypatch, mocker): + """ + Prueba el flujo completo de autenticación de WSAA. + Esto incluye crear un nuevo TA, leer un TA existente y manejar fallos de conexión. + Verifica el comportamiento correcto en cada escenario y comprueba el manejo adecuado de errores. + """ + # Create mock certificate and key files + mock_cert = tmp_path / "test.crt" + mock_key = tmp_path / "test.key" + mock_cert.write_text("Mock certificate content") + mock_key.write_text("Mock key content") + + # Mock os.access to always return True for our mock files + def mock_access(path, mode): + return str(path) in (str(mock_cert), str(mock_key)) + + monkeypatch.setattr(os, "access", mock_access) + + # Test new TA creation + result = wsaa_instance.Autenticar( + "test_service", str(mock_cert), str(mock_key), cache=str(tmp_path) + ) + + assert result == "mock_ta" + assert wsaa_instance.Token == "mock_token" + assert wsaa_instance.Sign == "mock_sign" + + captured = capsys.readouterr() + assert "Creando TRA..." in captured.out + assert "Firmando TRA..." in captured.out + assert "Conectando a WSAA..." in captured.out + assert "Llamando WSAA..." in captured.out + assert "Grabando TA en" in captured.out + + # Test existing TA + ta_file = tmp_path / "TA-test.xml" + ta_file.write_text("existing_ta") + + monkeypatch.setattr("os.path.exists", lambda x: True) + monkeypatch.setattr("os.path.getsize", lambda x: 100) + monkeypatch.setattr("os.path.getmtime", lambda x: time.time()) + + # Mock open to return our mock TA content + monkeypatch.setattr( + "builtins.open", lambda *args, **kwargs: io.StringIO("existing_ta") + ) + + # Ensure LoginCMS is not called when TA exists + monkeypatch.setattr( + WSAA, "LoginCMS", lambda self, cms: pytest.fail("LoginCMS should not be called") + ) + + result = wsaa_instance.Autenticar( + "test_service", str(mock_cert), str(mock_key), cache=str(tmp_path) + ) + assert result == "existing_ta" + + captured = capsys.readouterr() + assert "Leyendo TA de" in captured.out + + # Test connection failure + monkeypatch.setattr( + "os.path.getmtime", lambda x: time.time() - 36000 + ) # Make TA appear 10 hours old + mock_conectar = mocker.patch.object(WSAA, "Conectar", return_value=False) + mock_create_tra = mocker.patch.object(WSAA, "CreateTRA", return_value="mock_tra") + mock_sign_tra = mocker.patch.object(WSAA, "SignTRA", return_value="mock_cms") + + with pytest.raises(RuntimeError) as excinfo: + wsaa_instance.Autenticar( + "test_service", str(mock_cert), str(mock_key), cache=str(tmp_path) + ) + + assert "Fallo la conexión:" in str(excinfo.value) + assert mock_conectar.called + assert mock_create_tra.called + assert mock_sign_tra.called + + # Check that Excepcion contains the expected error message + assert "Fallo la conexión:" in wsaa_instance.Excepcion + + +def test_autenticar_error_handling(mocker): + """ + Prueba el método AnalizarCertificado con una entrada de certificado binario. + Verifica que el método analice correctamente el certificado y establezca los atributos apropiados. + """ + wsaa = WSAA() + mocker.patch("os.access", return_value=False) + + with pytest.raises(RuntimeError, match="Imposible abrir"): + wsaa.Autenticar("test_service", "nonexistent.crt", "nonexistent.key") + + +def test_expirado_with_custom_date(): + """ + Prueba el método Expirado con fechas personalizadas futuras y pasadas. + """ + wsaa = WSAA() + + future_date = datetime.datetime.now() + datetime.timedelta(days=1) + assert not wsaa.Expirado(future_date.strftime("%Y-%m-%dT%H:%M:%S")) + + past_date = datetime.datetime.now() - datetime.timedelta(days=1) + assert wsaa.Expirado(past_date.strftime("%Y-%m-%dT%H:%M:%S")) + + +def test_analizar_certificado_binary(mocker): + """ + Prueba el método Expirado con fechas personalizadas futuras y pasadas. + """ + wsaa = WSAA() + mock_cert = mocker.Mock() + mocker.patch("pyafipws.wsaa.x509.load_pem_x509_certificate", return_value=mock_cert) + + binary_cert = b"-----BEGIN CERTIFICATE-----\nMIID...\n" b"-----END CERTIFICATE-----" + result = wsaa.AnalizarCertificado(binary_cert, binary=True) + + assert result is True + assert wsaa.Identidad == mock_cert.subject + assert wsaa.Caducidad == mock_cert.not_valid_after + assert wsaa.Emisor == mock_cert.issuer + assert wsaa.CertX509 == mock_cert From edcbe3a82c3ae3084aa1f64f336bccd90db8f8e0 Mon Sep 17 00:00:00 2001 From: SONIABHISHEK121 Date: Wed, 17 Jul 2024 12:47:26 +0530 Subject: [PATCH 5/7] test_cot.py Signed-off-by: SONIABHISHEK121 --- .coveragerc | 1 - tests/test_cot.py | 275 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 275 insertions(+), 1 deletion(-) create mode 100644 tests/test_cot.py diff --git a/.coveragerc b/.coveragerc index 6f753e0a9..d4a1c2aab 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,7 +1,6 @@ [run] omit = - *cot* *iibb* *nsis* */padron.py diff --git a/tests/test_cot.py b/tests/test_cot.py new file mode 100644 index 000000000..b5051c0d7 --- /dev/null +++ b/tests/test_cot.py @@ -0,0 +1,275 @@ +"""Test the COT (Comprobante de Operaciones en Tránsito) module. + +This module contains tests for the COT class, which is used to interact with the +AFIP COT web service. The tests cover various scenarios, including connecting +to the service, presenting remitos, reading validation responses, and handling +errors. +""" +#!/usr/bin/python +# -*- coding: utf8 -*- +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 3, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. + +"""Test para cot""" + +__author__ = "Mariano Reingart " +__copyright__ = "Copyright (C) 2010-2019 Mariano Reingart" +__license__ = "GPL 3.0" + +"""Test the COT (Comprobante de Operaciones en Tránsito) module. + +This module contains tests for the COT class, which is used to interact with the +AFIP COT web service. The tests cover various scenarios, including connecting +to the service, presenting remitos, reading validation responses, and handling +errors. +""" + +import pytest +from pyafipws.cot import COT, INSTALL_DIR, __version__, HOMO +from pysimplesoap.simplexml import SimpleXMLElement + + +@pytest.fixture +def cot_instance(): + """Fixture to create a COT instance for testing.""" + return COT() + + +@pytest.mark.dontusefix +def test_conectar_with_error(cot_instance): + """ + Test the Conectar method with invalid parameters. + Expects an exception to be raised. + """ + with pytest.raises(Exception): + cot_instance.Conectar(url="invalid_url", proxy="invalid_proxy") + + +@pytest.mark.dontusefix +def test_presentar_remito_with_file_not_found(cot_instance, monkeypatch): + """ + Test PresentarRemito method when the file is not found. + Expects the method to return False and set an appropriate error message. + """ + monkeypatch.setattr("os.path.exists", lambda x: False) + result = cot_instance.PresentarRemito("non_existent_file.txt") + assert result is False + assert "Archivo no encontrado" in cot_instance.Excepcion + + +@pytest.mark.dontusefix +def test_leer_validacion_remito(cot_instance): + """ + Test LeerValidacionRemito method. + Checks if the method correctly reads and sets remito validation data. + """ + cot_instance.remitos = [ + { + "NumeroUnico": "123", + "Procesado": "SI", + "COT": "COT123", + "Errores": [("E001", "Error 1")], + } + ] + assert cot_instance.LeerValidacionRemito() is True + assert cot_instance.NumeroUnico == "123" + assert cot_instance.Procesado == "SI" + assert cot_instance.COT == "COT123" + assert cot_instance.LeerValidacionRemito() is False + + +@pytest.mark.dontusefix +def test_leer_error_validacion(cot_instance): + """ + Test LeerErrorValidacion method. + Verifies if the method correctly reads and sets error validation data. + """ + cot_instance.errores = [("E001", "Error 1"), ("E002", "Error 2")] + assert cot_instance.LeerErrorValidacion() is True + assert cot_instance.CodigoError == "E002" + assert cot_instance.MensajeError == "Error 2" + assert cot_instance.LeerErrorValidacion() is True + assert cot_instance.CodigoError == "E001" + assert cot_instance.MensajeError == "Error 1" + assert cot_instance.LeerErrorValidacion() is False + + +@pytest.mark.dontusefix +def test_analizar_xml_with_invalid_xml(cot_instance): + """ + Test AnalizarXml method with invalid XML. + Expects the method to return False and set an appropriate error message. + """ + assert cot_instance.AnalizarXml("") is False + assert "no element found" in cot_instance.Excepcion + + +@pytest.mark.dontusefix +def test_main_function(monkeypatch): + """ + Test the main function of the COT module. + Mocks necessary dependencies and checks if + the function runs without errors. + """ + monkeypatch.setattr( + "sys.argv", ["cot.py", "test_file.txt", "test_user", "test_password"] + ) + monkeypatch.setattr( + "pyafipws.cot.COT.PresentarRemito", + lambda self, filename, testing="": True + ) + from pyafipws.cot import main + + main() + + +@pytest.mark.dontusefix +def test_cot_initialization(monkeypatch): + """ + Test the initialization and basic functionality of the COT class. + Mocks WebClient and file operations, + then checks various attributes and methods. + """ + + def mock_webclient(*args, **kwargs): + return lambda *a, **k: "" + + monkeypatch.setattr("pyafipws.cot.WebClient", mock_webclient) + monkeypatch.setattr("builtins.open", lambda *args: None) + monkeypatch.setattr("os.path.exists", lambda x: True) + + cot = COT() + cot.Usuario = "test_user" + cot.Password = "test_password" + + cot.Conectar() + assert cot.client is not None + + result = cot.PresentarRemito("test_file.txt") + assert result is True + assert cot.XmlResponse == "" + + assert cot.LeerErrorValidacion() is False + assert cot.LeerValidacionRemito() is False + assert cot.AnalizarXml("") is True + assert cot.ObtenerTagXml("test") is None + + assert cot.InstallDir == INSTALL_DIR + expected_version = ( + f"{__version__} {'Homologación' if HOMO else ''}".strip() + ) + assert cot.Version.strip() == expected_version + assert set(cot._public_methods_) == { + "Conectar", + "PresentarRemito", + "LeerErrorValidacion", + "LeerValidacionRemito", + "AnalizarXml", + "ObtenerTagXml", + } + assert cot._reg_progid_ == "COT" + assert cot._reg_clsid_ == "{7518B2CF-23E9-4821-BC55-D15966E15620}" + + +@pytest.mark.dontusefix +def test_presentar_remito_with_different_responses(cot_instance, monkeypatch): + """ + Test PresentarRemito method with various XML responses. + Checks if the method correctly handles different response structures. + """ + responses = [ + "0", + ( + "1E001" + "Test Error" + ), + ( + "123456789" + "12345" + ), + ( + "123" + "SICOT123" + "" + ), + ] + + for response in responses: + monkeypatch.setattr( + "pyafipws.cot.WebClient", + lambda *args, **kwargs: lambda *a, **k: response + ) + monkeypatch.setattr("builtins.open", lambda *args: None) + monkeypatch.setattr("os.path.exists", lambda x: True) + + result = cot_instance.PresentarRemito("test.txt") + assert result is False + cot_instance.AnalizarXml(response) + + +@pytest.mark.dontusefix +def test_presentar_remito_error_handling(cot_instance, monkeypatch): + """ + Test error handling in PresentarRemito method. + Simulates an exception and checks if it's properly handled. + """ + + def raise_exception(*args, **kwargs): + raise Exception("Test exception") + + monkeypatch.setattr( + "pyafipws.cot.WebClient", lambda *args, **kwargs: raise_exception + ) + monkeypatch.setattr("builtins.open", lambda *args: None) + monkeypatch.setattr("os.path.exists", lambda x: True) + + result = cot_instance.PresentarRemito("test.txt") + assert result is False + assert cot_instance.Excepcion != "" + + +@pytest.mark.dontusefix +def test_obtener_tag_xml(cot_instance): + """ + Test ObtenerTagXml method. + Checks if the method correctly retrieves values from XML tags. + """ + xml = ( + "value1value2" + ) + cot_instance.xml = SimpleXMLElement(xml) + assert cot_instance.ObtenerTagXml("tag1") == "value1" + assert cot_instance.ObtenerTagXml("tag2", "subtag") == "value2" + assert cot_instance.ObtenerTagXml("nonexistent") is None + + +@pytest.mark.dontusefix +def test_analizar_xml_error_handling(cot_instance): + """ + Test error handling in AnalizarXml method. + Checks if the method properly handles invalid XML input. + """ + assert cot_instance.AnalizarXml("") is False + assert "no element found" in cot_instance.Excepcion + + +@pytest.mark.dontusefix +def test_limpiar(cot_instance): + """ + Test limpiar method. + Verifies if the method correctly resets instance attributes. + """ + cot_instance.XmlResponse = "test" + cot_instance.Excepcion = "test" + cot_instance.TipoError = "test" + cot_instance.limpiar() + assert cot_instance.XmlResponse == "" + assert cot_instance.Excepcion == "" + assert cot_instance.TipoError == "" From b6da2b6d5a1e5a41f0a96957220637f0bd5b08d0 Mon Sep 17 00:00:00 2001 From: SONIABHISHEK121 Date: Wed, 17 Jul 2024 13:29:59 +0530 Subject: [PATCH 6/7] update test_cot.py Signed-off-by: SONIABHISHEK121 --- tests/test_cot.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test_cot.py b/tests/test_cot.py index b5051c0d7..ca3f77b81 100644 --- a/tests/test_cot.py +++ b/tests/test_cot.py @@ -1,10 +1,3 @@ -"""Test the COT (Comprobante de Operaciones en Tránsito) module. - -This module contains tests for the COT class, which is used to interact with the -AFIP COT web service. The tests cover various scenarios, including connecting -to the service, presenting remitos, reading validation responses, and handling -errors. -""" #!/usr/bin/python # -*- coding: utf8 -*- # This program is free software; you can redistribute it and/or modify From 5e94653b8f800ad2f11ae8502e60d8611137635e Mon Sep 17 00:00:00 2001 From: SONIABHISHEK121 Date: Fri, 19 Jul 2024 00:21:08 +0530 Subject: [PATCH 7/7] test suite for recex1.py Signed-off-by: SONIABHISHEK121 --- recex1.py | 11 +- tests/test_recex1.py | 507 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 517 insertions(+), 1 deletion(-) create mode 100644 tests/test_recex1.py diff --git a/recex1.py b/recex1.py index 3162e37e6..28d4e58bb 100644 --- a/recex1.py +++ b/recex1.py @@ -29,6 +29,7 @@ import traceback # revisar la instalación de pyafip.ws: +from pyafipws.wsaa import WSAA from pyafipws import wsfexv1 from pyafipws.utils import SimpleXMLElement, SoapClient, SoapFault, date from pyafipws.utils import leer, escribir, leer_dbf, guardar_dbf, N, A, I, abrir_conf @@ -333,10 +334,18 @@ def main(): if config.has_option("WSFEXv1", "TIMEOUT"): TIMEOUT = int(config.get("WSFEXv1", "TIMEOUT")) + + # This change handles the case where 'proxy_port' may not be present in the config. + # It ensures that we only try to convert 'proxy_port' to an integer if it exists, + # preventing KeyError exceptions and making the code more robust to different + # configuration scenarios. This modification allows the function to work with + # or without proxy settings, improving its flexibility and error handling. if config.has_section("PROXY") and not HOMO: proxy_dict = dict(("proxy_%s" % k, v) for k, v in config.items("PROXY")) - proxy_dict["proxy_port"] = int(proxy_dict["proxy_port"]) + if "proxy_port" in proxy_dict: + proxy_dict["proxy_port"] = int(proxy_dict["proxy_port"]) + else: proxy_dict = {} diff --git a/tests/test_recex1.py b/tests/test_recex1.py new file mode 100644 index 000000000..787e873d5 --- /dev/null +++ b/tests/test_recex1.py @@ -0,0 +1,507 @@ +#!/usr/bin/python +# -*- coding: utf8 -*- +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 3, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. + +"""Test para Módulo recex1""" + +__author__ = "Mariano Reingart " +__copyright__ = "Copyright (C) 2010-2019 Mariano Reingart" +__license__ = "GPL 3.0" + +import io +import sys +import time +from unittest.mock import patch, MagicMock + +import pytest + +from pyafipws import recex1 + + +@pytest.fixture +def mock_ws(): + """ + Fixture que crea un mock del objeto WebService. + """ + return MagicMock() + + +@pytest.fixture +def mock_config(mocker): + """ + Fixture que mockea la función abrir_conf de recex1. + """ + return mocker.patch("pyafipws.recex1.abrir_conf") + + +@pytest.fixture +def mock_escribir(mocker): + """ + Fixture que mockea la función escribir de recex1 y retorna un valor fijo. + """ + mock = mocker.patch("pyafipws.recex1.escribir") + mock.return_value = "mocked_string" + return mock + + +@pytest.fixture +def mock_guardar_dbf(mocker): + """ + Fixture que mockea la función guardar_dbf de recex1. + """ + return mocker.patch("pyafipws.recex1.guardar_dbf") + + +@pytest.fixture +def mock_conf_dbf(mocker): + """ + Fixture que mockea el diccionario conf_dbf en recex1. + """ + return mocker.patch.dict( + "pyafipws.recex1.__dict__", {"conf_dbf": {"test": "config"}} + ) + + +@pytest.mark.dontusefix +def test_autorizar_with_testing_arg(mock_ws, mocker): + """ + Prueba la función autorizar con el argumento de testing. + """ + mock_entrada = io.StringIO( + ( + "0,20230101,1,1,1,1,S,203,Test Client,12345678901,Test Address," + "Test ID,1000.00,PES,1.0,Test Obs,Test Gen Obs,Test Payment,FOB," + "Test Incoterms,1\n" + ) + ) + mock_salida = io.StringIO() + mocker.patch.object(mock_ws, "GetLastID", return_value=1) + mocker.patch.object(mock_ws, "GetLastCMP", return_value=1) + mocker.patch("pyafipws.recex1.escribir_factura") + mocker.patch( + "pyafipws.recex1.leer", + return_value={ + "id": "", + "tipo_cbte": "1", + "punto_vta": "1", + "cbte_nro": "1", + "fecha_cbte": "20230101", + "tipo_expo": "1", + "permiso_existente": "S", + "pais_dst_cmp": "203", + "nombre_cliente": "Test Client", + "cuit_pais_cliente": "12345678901", + "domicilio_cliente": "Test Address", + "id_impositivo": "Test ID", + "imp_total": "1000.00", + "moneda_id": "PES", + "moneda_ctz": "1.0", + "obs_comerciales": "Test Obs", + "obs_generales": "Test Gen Obs", + "forma_pago": "Test Payment", + "incoterms": "FOB", + "incoterms_ds": "Test Incoterms", + "idioma_cbte": "1", + }, + ) + mocker.patch.object(sys, "argv", ["/testing"]) + mocker.patch.dict( + "pyafipws.recex1.__dict__", + {"TIPOS_REG": ("0", "1", "2", "3")} + ) + + recex1.autorizar(mock_ws, mock_entrada, mock_salida) + + mock_ws.CrearFactura.assert_called_once() + + +@pytest.mark.dontusefix +def test_autorizar_with_full_input(mock_ws, mocker): + """ + Prueba la función autorizar con una entrada completa. + """ + mock_entrada = io.StringIO( + ( + "0,20230101,1,1,1,1,S,203,Test Client,12345678901,Test Address," + "Test ID,1000.00,PES,1.0,Test Obs,Test Gen Obs,Test Payment,FOB," + "Test Incoterms,1\n" + "1,PROD1,1,1,1000.00,1000.00,0.00,Test Product\n" + "2,123456,203\n" + "3,1,1,1,12345678901\n" + ) + ) + mock_salida = io.StringIO() + mocker.patch.object(mock_ws, "GetLastID", return_value=1) + mocker.patch("pyafipws.recex1.escribir_factura") + mocker.patch( + "pyafipws.recex1.leer", + side_effect=[ + { + "id": "1", + "tipo_cbte": "1", + "punto_vta": "1", + "cbte_nro": "1", + "fecha_cbte": "20230101", + }, + { + "tipo_reg": "1", + "codigo": "PROD1", + "qty": "1", + "umed": "1", + "precio": "1000.00", + "importe": "1000.00", + "bonif": "0.00", + "ds": "Test Product", + }, + {"tipo_reg": "2", "id_permiso": "123456", "dst_merc": "203"}, + { + "tipo_reg": "3", + "cbte_tipo": "1", + "cbte_punto_vta": "1", + "cbte_nro": "1", + "cbte_cuit": "12345678901", + }, + ], + ) + mocker.patch.dict( + "pyafipws.recex1.__dict__", + {"TIPOS_REG": ("0", "1", "2", "3")} + ) + + recex1.autorizar(mock_ws, mock_entrada, mock_salida) + + mock_ws.CrearFactura.assert_called_once() + mock_ws.AgregarItem.assert_called_once() + mock_ws.AgregarPermiso.assert_called_once() + mock_ws.AgregarCmpAsoc.assert_called_once() + + +@pytest.mark.dontusefix +def test_autorizar_with_error(mock_ws, mocker): + """ + Prueba la función autorizar cuando se produce un error. + """ + mock_entrada = io.StringIO( + ( + "0,20230101,1,1,1,1,S,203,Test Client,12345678901,Test Address," + "Test ID,1000.00,PES,1.0,Test Obs,Test Gen Obs,Test Payment,FOB," + "Test Incoterms,1\n" + ) + ) + mock_salida = io.StringIO() + mocker.patch.object(mock_ws, "GetLastID", return_value=1) + mocker.patch( + "pyafipws.recex1.leer", + return_value={"id": "1", "cbte_nro": "1"} + ) + mock_ws.Authorize.side_effect = Exception("Test error") + mocker.patch.dict( + "pyafipws.recex1.__dict__", + {"TIPOS_REG": ("0", "1", "2", "3")} + ) + + with pytest.raises(Exception): + recex1.autorizar(mock_ws, mock_entrada, mock_salida) + + +@pytest.mark.dontusefix +def test_escribir_factura_basic(mock_escribir): + """ + Prueba la función escribir_factura con un diccionario básico. + """ + dic = {"tipo_reg": "0", "id": "1"} + archivo = io.StringIO() + recex1.escribir_factura(dic, archivo) + + mock_escribir.assert_called_once_with(dic, recex1.ENCABEZADO) + assert dic["tipo_reg"] == recex1.TIPOS_REG[0] + assert archivo.getvalue() == "mocked_string" + + +@pytest.mark.dontusefix +def test_escribir_factura_with_all_fields(mock_escribir): + """ + Prueba la función escribir_factura con todos los campos posibles. + """ + dic = { + "id": "1", + "detalles": [{"codigo": "001"}], + "permisos": [{"id_permiso": "P001"}], + "cbtes_asoc": [{"cbte_nro": "001"}], + } + archivo = io.StringIO() + recex1.escribir_factura(dic, archivo) + + assert mock_escribir.call_count == 4 + assert archivo.getvalue() == "mocked_string" * 4 + + +@pytest.mark.dontusefix +def test_escribir_factura_with_dbf( + mock_escribir, mock_guardar_dbf, mock_conf_dbf +): + """ + Prueba la función escribir_factura con la opción DBF activada. + """ + dic = {"id": "1"} + archivo = io.StringIO() + + with patch.object(sys, "argv", ["/dbf"]): + recex1.escribir_factura(dic, archivo) + + mock_guardar_dbf.assert_called_once() + expected_formatos = [ + ("Encabezado", recex1.ENCABEZADO, [dic]), + ("Permisos", recex1.PERMISO, []), + ("Comprobante Asociado", recex1.CMP_ASOC, []), + ("Detalles", recex1.DETALLE, []), + ] + assert mock_guardar_dbf.call_args[0][0] == expected_formatos + assert archivo.getvalue() == "mocked_string" + + +@pytest.mark.dontusefix +def test_escribir_factura_without_dbf(mock_escribir, mock_guardar_dbf): + """ + Prueba la función escribir_factura sin la opción DBF. + """ + dic = {"id": "1"} + archivo = io.StringIO() + + with patch.object(sys, "argv", []): + recex1.escribir_factura(dic, archivo) + + mock_guardar_dbf.assert_not_called() + assert archivo.getvalue() == "mocked_string" + + +@pytest.mark.dontusefix +def test_escribir_factura_agrega_true( + mock_escribir, mock_guardar_dbf, mock_conf_dbf +): + """ + Prueba la función escribir_factura con la opción de agregar activada. + """ + dic = {"id": "1"} + archivo = io.StringIO() + + with patch.object(sys, "argv", ["/dbf"]): + recex1.escribir_factura(dic, archivo, agrega=True) + + mock_guardar_dbf.assert_called_once() + assert mock_guardar_dbf.call_args[0][1] is True + assert archivo.getvalue() == "mocked_string" + + +@pytest.mark.dontusefix +def test_escribir_factura_empty_dic(mock_escribir): + """ + Prueba la función escribir_factura con un diccionario vacío. + """ + dic = {} + archivo = io.StringIO() + recex1.escribir_factura(dic, archivo) + + mock_escribir.assert_called_once() + assert archivo.getvalue() == "mocked_string" + + +@pytest.mark.dontusefix +def test_escribir_factura_multiple_items(mock_escribir): + """ + Prueba la función escribir_factura con múltiples items en cada campo. + """ + dic = { + "id": "1", + "detalles": [{"codigo": "001"}, {"codigo": "002"}], + "permisos": [{"id_permiso": "P001"}, {"id_permiso": "P002"}], + "cbtes_asoc": [{"cbte_nro": "001"}, {"cbte_nro": "002"}], + } + archivo = io.StringIO() + recex1.escribir_factura(dic, archivo) + + # 1 (header) + 2 (detalles) + 2 (permisos) + 2 (cbtes_asoc) + assert mock_escribir.call_count == 7 + assert archivo.getvalue() == "mocked_string" * 7 + + +@pytest.mark.dontusefix +def test_depurar_xml(mocker): + """ + Prueba la función depurar_xml para asegurar que escribe correctamente + los archivos de solicitud y respuesta XML. + """ + mock_client = mocker.Mock() + mock_client.xml_request = "mock request" + mock_client.xml_response = b"mock response" + + mock_time = "20230101120000" + + mocker.patch("time.strftime", return_value=mock_time) + mock_open = mocker.mock_open() + mocker.patch("builtins.open", mock_open) + + recex1.depurar_xml(mock_client) + + # Verifica que se abren los archivos correctos + mock_open.assert_any_call(f"request-{mock_time}.xml", "w") + mock_open.assert_any_call(f"response-{mock_time}.xml", "w") + + # Verifica que se escribe el contenido correcto + mock_open().write.assert_any_call("mock request") + mock_open().write.assert_any_call("mock response") + + # Verifica que se llama a close() dos veces (una por cada archivo) + assert mock_open().close.call_count == 2 + + +@pytest.fixture +def mock_config(mocker): + """ + Fixture que crea un mock de la configuración. + + Simula las respuestas de un objeto de configuración para diferentes + secciones y claves. + """ + config = mocker.MagicMock() + config.get.side_effect = lambda section, key: { + ("WSAA", "CERT"): "cert.crt", + ("WSAA", "PRIVATEKEY"): "key.key", + ("WSFEXv1", "CUIT"): "20111111112", + ("WSFEXv1", "ENTRADA"): "entrada.txt", + ("WSFEXv1", "SALIDA"): "salida.txt", + ("WSFEXv1", "URL"): "https://test.url", + ("WSFEXv1", "TIMEOUT"): "30", + }.get((section, key)) + config.has_option.return_value = True + config.has_section.return_value = True + config.items.return_value = [("host", "localhost"), ("port", "8080")] + return config + + +@pytest.fixture +def mock_ws(mocker): + """ + Fixture que crea un mock del objeto WebService. + + Configura valores de retorno predeterminados para varios métodos del WS. + """ + ws = mocker.MagicMock() + ws.Dummy.return_value = True + ws.GetLastCMP.return_value = 1 + ws.GetParamCtz.return_value = 1.0 + ws.GetParamMonConCotizacion.return_value = ["USD:1.0"] + return ws + + +@pytest.fixture +def mock_wsaa(mocker): + """ + Fixture que crea un mock del objeto WSAA. + + Configura un valor de retorno predeterminado para el método Autenticar. + """ + wsaa = mocker.MagicMock() + wsaa.Autenticar.return_value = "mocked_ta" + return wsaa + + +@pytest.mark.dontusefix +def setup_mocks(mocker, mock_config, mock_ws, mock_wsaa): + """ + Configura varios mocks para las pruebas. + + Esta función aplica parches a varias funciones y métodos utilizados en + recex1.py para simular su comportamiento durante las pruebas. + """ + mocker.patch("pyafipws.recex1.abrir_conf", return_value=mock_config) + mocker.patch("pyafipws.recex1.wsfexv1.WSFEXv1", return_value=mock_ws) + mocker.patch("pyafipws.recex1.WSAA", return_value=mock_wsaa) + mocker.patch("pyafipws.recex1.autorizar") + mocker.patch("pyafipws.recex1.depurar_xml") + mocker.patch("pyafipws.recex1.escribir_factura") + mocker.patch("os.path.exists", return_value=True) + mocker.patch("os.path.getsize", return_value=100) + mocker.patch("os.path.getmtime", return_value=time.time()) + mocker.patch( + "builtins.open", + mocker.mock_open(read_data="mocked_file_content") + ) + mocker.patch("builtins.input", side_effect=["1", "1", "1"]) + + +@pytest.mark.parametrize( + "args", + [ + ["/ayuda"], + ["/dummy"], + ["/debug", "/xml", "/ult", "1", "1"], + ["/get", "1", "1", "1"], + ["/ctz", "USD"], + ["/monctz", "20230101"], + ["/prueba"], + ["/dbf"], + ["/testing"], + ], +) +@pytest.mark.dontusefix +def test_main_comprehensive(mocker, mock_config, mock_ws, mock_wsaa, args): + """ + Prueba comprehensiva de la función main de recex1. + + Esta prueba verifica el comportamiento de recex1.main() bajo diferentes + escenarios de ejecución, determinados por los argumentos proporcionados. + """ + setup_mocks(mocker, mock_config, mock_ws, mock_wsaa) + mocker.patch.object(recex1.sys, "argv", ["recex1.py"] + args) + mocker.patch("os.access", return_value=True) + mocker.patch( + "builtins.open", + mocker.mock_open(read_data="mocked_file_content") + ) + + mocker.patch( + "pyafipws.wsaa.open", + mocker.mock_open(read_data="mocked_ta_content") + ) + mocker.patch( + "os.path.exists", + return_value=True + ) + mocker.patch( + "pyafipws.wsaa.SimpleXMLElement", + return_value=mocker.MagicMock() + ) + + recex1.main() + + if "/ayuda" in args: + mock_ws.Conectar.assert_not_called() + mock_wsaa.Autenticar.assert_not_called() + else: + mock_ws.Conectar.assert_called() + + if "/prueba" in args: + mock_ws.CrearFactura.assert_called_once() + mock_ws.AgregarItem.assert_called_once() + mock_ws.AgregarPermiso.assert_called_once() + elif "/ayuda" not in args: + if not any( + arg in args + for arg in [ + "/dummy", "/ult", "/get", + "/ctz", "/monctz", "/dbf", + "/testing" + ] + ): + mock_wsaa.Autenticar.assert_called() + + mock_ws.Cuit = mock_config.get("WSFEXv1", "CUIT")