From 821ce284f2d8900bfd0aec8d291caff79c1c35c5 Mon Sep 17 00:00:00 2001 From: Kunal-Somani Date: Sat, 21 Mar 2026 15:47:45 +0530 Subject: [PATCH] fix(logging): fix f-string logger calls and add missing boundary tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix logger.error() calls in AudioService and TranscriptService that were using f-strings with str(e) — replaced with % formatting which is Python logging best practice and avoids string interpolation cost when the log level is disabled - Add three boundary tests to TestAudioService that were missing from the original test suite: * test_extract_audio_start_time_zero: start_time_ms=0 is a valid lower boundary but was never tested — the guard only rejects < 0 * test_extract_audio_float_start_time: validation explicitly allows isinstance(start_time_ms, (int, float)) but no test exercised a float value passing through successfully * test_extract_audio_no_user_id: _save_audio unit tests cover the None branch internally but extract_audio was never called with user_id=None end-to-end — verifies None is forwarded correctly to _save_audio rather than silently dropped --- app/services/audio_service.py | 243 +++++++----------- app/services/transcript_service.py | 117 ++++----- .../unit/test_services/test_audio_service.py | 140 +++++++++- 3 files changed, 290 insertions(+), 210 deletions(-) diff --git a/app/services/audio_service.py b/app/services/audio_service.py index a7eb144..5ce9885 100644 --- a/app/services/audio_service.py +++ b/app/services/audio_service.py @@ -1,143 +1,100 @@ -""" -This module contains the service layer for extracting audio segments. -""" -import uuid -import os -from pydub import AudioSegment - -from app.config import Config - -from app.utils.logger import logger - -# Data layer for fetching audio files -from app.data.audio_data import AudioDataLayer - -config = Config().config # Load the configuration - -class AudioService: - def __init__(self,static_folder="static/audio"): - self.debug = config.get('debug') - - self.audio_data_layer = AudioDataLayer(config) - self.static_folder = static_folder - - def extract_audio(self, url: str, start_time_ms: int, end_time_ms: int = None, user_id: str = None): - """ - Extract a segment from the audio file. - :param url: URL or local file path to the audio file. - :param start_time_ms: Start time of the segment to extract (in milliseconds). - :param end_time_ms: End time of the segment to extract (in milliseconds). - :param user_id: (Optional) User ID for creating user-specific subdirectories. - :return: Path to the saved audio file or error message - """ - try: - # Validate start_time_ms (must be a non-negative number) - if not isinstance(start_time_ms, (int, float)) or start_time_ms < 0: - return { - 'error': 'Start time must be a non-negative number.' - } - - # Fetch the audio file using the AudioDataLayer [Data Layer] - audio = self.audio_data_layer.fetch_audio(url) - - if isinstance(audio, dict) and 'error' in audio: - # If there was an error fetching the audio, return it - return { - 'error': audio['error'] # Return the error message - } - - # If end_time_ms is None, set it to the length of the audio file - if end_time_ms is None or end_time_ms > len(audio): - end_time_ms = len(audio) - - # Validate that end_time_ms is not less than start_time_ms - if end_time_ms < start_time_ms: - return { - 'error': 'End time must not be less than start time.' - } - - # Extract the segment from the audio - extracted_audio = audio[start_time_ms:end_time_ms] - - # Save the extracted audio (with a unique filename) - file_path = self._save_audio(extracted_audio, user_id) - - # Return the file path or URL to access the audio - return { - "audio_path": file_path, - "start_time_ms": start_time_ms, - "end_time_ms": end_time_ms - } - - except Exception as e: - # Catch any other exceptions - logger.error(f"[error] [Service Layer] [AudioService] [extract_audio] An error occurred during the audio extraction: {str(e)}") - # print(f"[error] [Service Layer] [AudioService] [extract_audio] An error occurred during the audio extraction: {str(e)}") - return {'error': 'An unexpected error occurred while processing the request.'} # Generic error message - - - def _save_audio(self, audio: AudioSegment, user_id: str = None): - """ - Save the audio segment with a unique filename. - :param audio: The audio segment to save. - :param user_id: (Optional) User ID for creating user-specific subdirectories. - :return: The path to the saved audio file. - """ - # Generate a unique filename using UUID - unique_filename = f"{str(uuid.uuid4())}_audio.mp3" - - # Optionally, create a user-specific subdirectory if user_id is provided - if user_id: - user_folder = os.path.join(self.static_folder, user_id).replace("\\", "/") - os.makedirs(user_folder, exist_ok=True) - file_path = f"{user_folder}/{unique_filename}" - else: - os.makedirs(self.static_folder, exist_ok=True) - file_path = f"{self.static_folder}/{unique_filename}" - - - # Export the audio to the file path - audio.export(file_path, format="mp3") - - return file_path - - -# if __name__ == "__main__": -# service = AudioService() - - # # start_time_ms < 0 - # audio = service.extract_audio("https://drive.usercontent.google.com/u/2/uc?id=1BJ-0fvbc0mlDWaBGci0Ma-f1k6iElh6v", -100,100) - # print("audio",audio) - - # # Invalid URL - # audio = service.extract_audio("https://invalid-url.com/audio.mp3", 0, 5000) - # print("audio",audio) - - # # Invalid local file - # audio = service.extract_audio("./samples/non-exist.mp4",0,5000) - # print("audio",audio) - - # # end_time_ms is None - # audio = service.extract_audio("https://drive.usercontent.google.com/u/2/uc?id=1BJ-0fvbc0mlDWaBGci0Ma-f1k6iElh6v", 0) - # print("audio",audio) - - # # end_time_ms > len(audio) - # audio = service.extract_audio("https://drive.usercontent.google.com/u/2/uc?id=1BJ-0fvbc0mlDWaBGci0Ma-f1k6iElh6v", 0, 500000000) - # print("audio",audio) - - # # start_time_ms > end_time_ms - # audio = service.extract_audio("https://drive.usercontent.google.com/u/2/uc?id=1BJ-0fvbc0mlDWaBGci0Ma-f1k6iElh6v", 500,100) - # print("audio",audio) - - # # From URL (Normal Case) - # result = service.extract_audio("https://drive.usercontent.google.com/u/2/uc?id=1BJ-0fvbc0mlDWaBGci0Ma-f1k6iElh6v", 0, 5000) - # print(result) - - # # From local file (Normal Case) - # audio = service.extract_audio("./samples/sample_0.mp4",0,5000) - # print("audio",audio) - - - -# # Run: -# # python -m app.services.audio_service \ No newline at end of file +""" +This module contains the service layer for extracting audio segments. +""" +import uuid +import os +from pydub import AudioSegment + +from app.config import Config + +from app.utils.logger import logger + +# Data layer for fetching audio files +from app.data.audio_data import AudioDataLayer + +config = Config().config # Load the configuration + +class AudioService: + def __init__(self, static_folder="static/audio"): + self.debug = config.get('debug') + + self.audio_data_layer = AudioDataLayer(config) + self.static_folder = static_folder + + def extract_audio(self, url: str, start_time_ms: int, end_time_ms: int = None, user_id: str = None): + """ + Extract a segment from the audio file. + :param url: URL or local file path to the audio file. + :param start_time_ms: Start time of the segment to extract (in milliseconds). + :param end_time_ms: End time of the segment to extract (in milliseconds). + :param user_id: (Optional) User ID for creating user-specific subdirectories. + :return: Path to the saved audio file or error message + """ + try: + # Validate start_time_ms (must be a non-negative number) + if not isinstance(start_time_ms, (int, float)) or start_time_ms < 0: + return { + 'error': 'Start time must be a non-negative number.' + } + + # Fetch the audio file using the AudioDataLayer [Data Layer] + audio = self.audio_data_layer.fetch_audio(url) + + if isinstance(audio, dict) and 'error' in audio: + return { + 'error': audio['error'] + } + + # If end_time_ms is None, set it to the length of the audio file + if end_time_ms is None or end_time_ms > len(audio): + end_time_ms = len(audio) + + # Validate that end_time_ms is not less than start_time_ms + if end_time_ms < start_time_ms: + return { + 'error': 'End time must not be less than start time.' + } + + # Extract the segment from the audio + extracted_audio = audio[start_time_ms:end_time_ms] + + # Save the extracted audio (with a unique filename) + file_path = self._save_audio(extracted_audio, user_id) + + return { + "audio_path": file_path, + "start_time_ms": start_time_ms, + "end_time_ms": end_time_ms + } + + except Exception as e: + logger.error( + "[Service Layer] [AudioService] [extract_audio] An error occurred: %s", + str(e) + ) + return {'error': 'An unexpected error occurred while processing the request.'} + + + def _save_audio(self, audio: AudioSegment, user_id: str = None): + """ + Save the audio segment with a unique filename. + :param audio: The audio segment to save. + :param user_id: (Optional) User ID for creating user-specific subdirectories. + :return: The path to the saved audio file. + """ + # Generate a unique filename using UUID + unique_filename = f"{str(uuid.uuid4())}_audio.mp3" + + # Optionally, create a user-specific subdirectory if user_id is provided + if user_id: + user_folder = os.path.join(self.static_folder, user_id).replace("\\", "/") + os.makedirs(user_folder, exist_ok=True) + file_path = f"{user_folder}/{unique_filename}" + else: + os.makedirs(self.static_folder, exist_ok=True) + file_path = f"{self.static_folder}/{unique_filename}" + + # Export the audio to the file path + audio.export(file_path, format="mp3") + + return file_path diff --git a/app/services/transcript_service.py b/app/services/transcript_service.py index 797c50e..4a30a18 100644 --- a/app/services/transcript_service.py +++ b/app/services/transcript_service.py @@ -1,66 +1,51 @@ -""" -This module contains the service layer for transcribing audio files. -""" -import os -from app.config import Config - -from app.utils.logger import logger - -# Data Layer for fetching and processing transcripts -from app.data.transcript_data import TranscriptDataLayer - -config = Config().config # Load the configuration - -class TranscriptService: - def __init__(self): - self.debug = config.get('debug') - - self.transcript_data_layer = TranscriptDataLayer(config) - - def transcribe(self, audio_file_path: str) -> tuple: - """ - Transcribe the given audio file. - :param audio_file_path: Path to the audio file - :return: Transcribed text and chunks - """ - try: - # Check if the audio file exists - if not os.path.exists(audio_file_path) or not os.path.isfile(audio_file_path): - return { - 'error': f'Audio file not found: {audio_file_path}' - } - - result = self.transcript_data_layer.transcribe(audio_file_path) - - if isinstance(result, dict) and 'error' in result: - return { - 'error': result['error'] - } - - # Return the transcribed text and chunks - return { - 'transcription': result['transcription'], - 'chunks': result['chunks'] - } - - except Exception as e: - # Catch any other exceptions - logger.error(f"[error] [Service Layer] [TranscriptService] [transcribe] An error occurred during transcription: {str(e)}") - # print(f"[error] [Service Layer] [TranscriptService] [transcribe] An error occurred during transcription: {str(e)}") - return {'error': 'An unexpected error occurred while processing the request.'} # Generic error message - - -# if __name__ == "__main__": -# transcript_service = TranscriptService() -# print("transcript_service",transcript_service) - -# # Normal Case -# result = transcript_service.transcribe("./samples/sample_1.mp3") -# print("result",result) - - # # File doesn't exist - # result = transcript_service.transcribe("./samples/non_exist_file.mp3") - # print("result",result) - -# # Run: -# # python -m app.services.transcript_service \ No newline at end of file +""" +This module contains the service layer for transcribing audio files. +""" +import os +from app.config import Config + +from app.utils.logger import logger + +# Data Layer for fetching and processing transcripts +from app.data.transcript_data import TranscriptDataLayer + +config = Config().config # Load the configuration + +class TranscriptService: + def __init__(self): + self.debug = config.get('debug') + + self.transcript_data_layer = TranscriptDataLayer(config) + + def transcribe(self, audio_file_path: str) -> tuple: + """ + Transcribe the given audio file. + :param audio_file_path: Path to the audio file + :return: Transcribed text and chunks + """ + try: + # Check if the audio file exists + if not os.path.exists(audio_file_path) or not os.path.isfile(audio_file_path): + return { + 'error': f'Audio file not found: {audio_file_path}' + } + + result = self.transcript_data_layer.transcribe(audio_file_path) + + if isinstance(result, dict) and 'error' in result: + return { + 'error': result['error'] + } + + # Return the transcribed text and chunks + return { + 'transcription': result['transcription'], + 'chunks': result['chunks'] + } + + except Exception as e: + logger.error( + "[Service Layer] [TranscriptService] [transcribe] An error occurred: %s", + str(e) + ) + return {'error': 'An unexpected error occurred while processing the request.'} diff --git a/tests/unit/test_services/test_audio_service.py b/tests/unit/test_services/test_audio_service.py index bb83e9b..070bd06 100644 --- a/tests/unit/test_services/test_audio_service.py +++ b/tests/unit/test_services/test_audio_service.py @@ -327,4 +327,142 @@ def test_save_audio_export_failure(self,audio_service,mock_uuid,mock_os__makedir # # Run: -# coverage run -m pytest .\tests\unit\test_services\test_audio_service.py \ No newline at end of file +# coverage run -m pytest .\tests\unit\test_services\test_audio_service.py + +# ── NEW BOUNDARY TESTS ──────────────────────────────────────────────────────── + +class TestExtractAudioBoundary: + """ + Boundary and edge-case tests for AudioService.extract_audio() that were + missing from the original test suite. + + Three gaps covered: + 1. start_time_ms=0 — valid lower boundary, previously untested + 2. start_time_ms as float — the validation explicitly allows float via + isinstance(start_time_ms, (int, float)) but no test exercised this path + 3. user_id=None through the full extract_audio flow — _save_audio unit + tests cover the None branch internally, but extract_audio never passed + user_id=None end-to-end + """ + + @pytest.fixture + def audio_service(self): + return AudioService(static_folder="mock_static") + + @pytest.fixture + def mock_audio_data_layer__fetch_audio(self): + with patch('app.services.audio_service.AudioDataLayer.fetch_audio') as mock: + yield mock + + @pytest.fixture + def mock__save_audio(self): + with patch('app.services.audio_service.AudioService._save_audio') as mock: + mock.return_value = "mock_audio_path" + yield mock + + def test_extract_audio_start_time_zero( + self, + audio_service, + mock_audio_data_layer__fetch_audio, + mock__save_audio + ): + """ + start_time_ms=0 is a valid lower boundary. + The guard clause rejects start_time_ms < 0, so 0 must pass through + and produce a successful result. + """ + mock_audio = MagicMock() + mocked_audio_length = 5000 + mock_audio_data_layer__fetch_audio.return_value = mock_audio + mock_audio_data_layer__fetch_audio.return_value.__len__.return_value = mocked_audio_length + + expected_slice = "mocked_audio[0:1000]" + mock_audio.__getitem__.return_value = expected_slice + + result = audio_service.extract_audio( + url="https://example.com/audio.mp3", + start_time_ms=0, + end_time_ms=1000, + user_id="user123" + ) + + mock_audio_data_layer__fetch_audio.assert_called_once_with("https://example.com/audio.mp3") + mock_audio.__getitem__.assert_called_once_with(slice(0, 1000)) + mock__save_audio.assert_called_once_with(expected_slice, "user123") + assert result == { + "audio_path": "mock_audio_path", + "start_time_ms": 0, + "end_time_ms": 1000 + } + + def test_extract_audio_float_start_time( + self, + audio_service, + mock_audio_data_layer__fetch_audio, + mock__save_audio + ): + """ + The validation uses isinstance(start_time_ms, (int, float)), explicitly + allowing floats, but no test verified a float value is accepted. + A non-negative float must pass validation and produce a successful result. + """ + mock_audio = MagicMock() + mocked_audio_length = 5000 + mock_audio_data_layer__fetch_audio.return_value = mock_audio + mock_audio_data_layer__fetch_audio.return_value.__len__.return_value = mocked_audio_length + + expected_slice = "mocked_audio[10.5:1000]" + mock_audio.__getitem__.return_value = expected_slice + + result = audio_service.extract_audio( + url="https://example.com/audio.mp3", + start_time_ms=10.5, + end_time_ms=1000, + user_id="user123" + ) + + mock_audio_data_layer__fetch_audio.assert_called_once_with("https://example.com/audio.mp3") + mock_audio.__getitem__.assert_called_once_with(slice(10.5, 1000)) + mock__save_audio.assert_called_once_with(expected_slice, "user123") + assert result == { + "audio_path": "mock_audio_path", + "start_time_ms": 10.5, + "end_time_ms": 1000 + } + + def test_extract_audio_no_user_id( + self, + audio_service, + mock_audio_data_layer__fetch_audio, + mock__save_audio + ): + """ + user_id=None must flow through extract_audio and be passed to + _save_audio correctly. The _save_audio unit tests cover the None + branch internally but extract_audio itself was never called without + a user_id in the end-to-end flow tests. + """ + mock_audio = MagicMock() + mocked_audio_length = 5000 + mock_audio_data_layer__fetch_audio.return_value = mock_audio + mock_audio_data_layer__fetch_audio.return_value.__len__.return_value = mocked_audio_length + + expected_slice = "mocked_audio[10:1000]" + mock_audio.__getitem__.return_value = expected_slice + + result = audio_service.extract_audio( + url="https://example.com/audio.mp3", + start_time_ms=10, + end_time_ms=1000, + user_id=None + ) + + mock_audio_data_layer__fetch_audio.assert_called_once_with("https://example.com/audio.mp3") + mock_audio.__getitem__.assert_called_once_with(slice(10, 1000)) + # user_id=None must be forwarded to _save_audio, not silently dropped + mock__save_audio.assert_called_once_with(expected_slice, None) + assert result == { + "audio_path": "mock_audio_path", + "start_time_ms": 10, + "end_time_ms": 1000 + }