From dc91f24cf847ca45587066996ebb6064f1cad6a3 Mon Sep 17 00:00:00 2001 From: Kunal-Somani Date: Sat, 21 Mar 2026 12:37:21 +0530 Subject: [PATCH] fix(pipeline): fix malformed logger calls and add remove_audio test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix logger.debug() calls in AudioTranscriptionSentimentPipeline.process() that were passing objects as positional args after an f-string — the extra args were silently ignored, making debug output useless. Replaced with correct % formatting: logger.debug('message: %s', obj) - Add two new unit tests for the remove_audio code path which had zero coverage — the shared fixture hardcodes remove_audio=False so os.remove() was never exercised: * test_process__remove_audio_called_when_enabled: asserts os.remove() is called exactly once with the correct audio path when remove_audio=True * test_process__remove_audio_not_called_when_disabled: asserts os.remove() is never called when remove_audio=False --- .../audio_transcription_sentiment_pipeline.py | 301 ++++----- ..._audio_transcription_sentiment_pipeline.py | 616 +++++++++--------- 2 files changed, 446 insertions(+), 471 deletions(-) diff --git a/app/services/audio_transcription_sentiment_pipeline.py b/app/services/audio_transcription_sentiment_pipeline.py index c7b641b..d38a8c4 100644 --- a/app/services/audio_transcription_sentiment_pipeline.py +++ b/app/services/audio_transcription_sentiment_pipeline.py @@ -1,162 +1,139 @@ -import os - -from app.config import Config - -from app.utils.logger import logger - -# Services -from app.services.audio_service import AudioService -from app.services.transcript_service import TranscriptService -from app.services.sentiment_service import SentimentService - -config = Config().config # Load the configuration - - -from pydantic import BaseModel -from typing import List, Union - -class TranscriptionChunk(BaseModel): - timestamp: List[int] # [start_time_ms, end_time_ms] - text: str # Text from the chunk - label: Union[str, None] = None # Sentiment label (optional) - confidence: Union[float, None] = None # Sentiment confidence score (optional) - -class AudioTranscriptionSentimentResult(BaseModel): - audio_path: str # Path to the extracted audio segment - start_time_ms: int # Start time of the segment (in milliseconds) - end_time_ms: int # End time of the segment (in milliseconds) - transcription: str # Full transcription of the audio segment - utterances_sentiment: List[TranscriptionChunk] # Sentiment analysis for each chunk - -class ErrorResponse(BaseModel): - error: str # Error message describing what went wrong - -# Union type to handle both successful and error responses -ProcessResponse = Union[AudioTranscriptionSentimentResult, ErrorResponse] - - -class AudioTranscriptionSentimentPipeline: - def __init__(self): - self.debug = config.get('debug') - - self.config = config.get('audio_transcription_sentiment_pipeline') - self.remove_audio = self.config.get('remove_audio') - - self.audio_service = AudioService() - self.transcript_service = TranscriptService() - self.sentiment_service = SentimentService() - - def process(self, url: str, start_time_ms: int, end_time_ms: int = None, user_id: str = None)-> ProcessResponse: - """ - Process the Video/Audio file by extracting a segment, transcribing it, and performing sentiment analysis. - :param url: URL or local file path to the audio file. - :param start_time_ms: Start time of the segment to extract (in milliseconds). - :param end_time_ms: End time of the segment to extract (in milliseconds). - :param user_id: (Optional) User ID for creating user-specific subdirectories - :return: Transcription, sentiment analysis, and audio segment details - """ - try: - # Step(1) Extract the audio segment - audio_result = self.audio_service.extract_audio(url, start_time_ms, end_time_ms, user_id) - - if isinstance(audio_result, dict) and 'error' in audio_result: - # If there was an error extracting the audio, return it - return { - 'error': audio_result["error"] # Return the error message - } - - if self.debug: - logger.debug(f"[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [audio_result]", audio_result) - # print("[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [audio_result]", audio_result) - - # Parse the audio segment details - audio_path = audio_result['audio_path'] - start_time_ms = audio_result['start_time_ms'] - end_time_ms = audio_result['end_time_ms'] - - # Step(2) Transcribe the audio segment - transcription_result = self.transcript_service.transcribe(audio_path) - - if isinstance(transcription_result, dict) and 'error' in transcription_result: - return { - 'error': transcription_result['error'] # Return the error message - } - - if self.debug: - logger.debug("[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [transcription_result]", transcription_result) - # print("[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [transcription_result]", transcription_result) - - # Parse the transcription details - transcription = transcription_result['transcription'] # Full transcription text - chunks = transcription_result['chunks'] # Transcription chunks [{'timestamp': (,), 'text':""}] - - - # Remove the audio file after processing - if self.remove_audio: - logger.debug(f"[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] Removing audio file: {audio_path}") - # print(f"[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] Removing audio file: {audio_path}") - os.remove(audio_path) - - - # Step(3) Perform sentiment [Per chunk :D] - for chunk in chunks: - timestamp = chunk['timestamp'] - text = chunk['text'] - - sentiment_result = self.sentiment_service.analyze(text) - if isinstance(sentiment_result, dict) and 'error' in sentiment_result: - logger.error(f"[error] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result) - # print("[error] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result) - chunk['error'] = sentiment_result['error'] # Add the error message to the chunk - continue # Skip this chunk if there was an error :D - - if self.debug: - logger.debug("[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result) - # print("[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result) - - # Add the sentiment result to the chunk - chunk['label'] = sentiment_result['label'] - chunk['confidence'] = sentiment_result['confidence'] - - # Return the transcription, sentiment analysis, and audio segment details - return { - 'audio_path': audio_path, - 'start_time_ms': start_time_ms, - 'end_time_ms': end_time_ms, - 'transcription': transcription, - 'utterances_sentiment': chunks, - } - except Exception as e: - logger.error(f"[error] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] An error occurred during processing: {str(e)}") - # print(f"[error] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] An error occurred during processing: {str(e)}") - return {'error': 'An unexpected error occurred while processing the request.'} # Generic error message - - - -# if __name__ == "__main__": -# pipeline = AudioTranscriptionSentimentPipeline() -# print("pipeline",pipeline) - - # # URL to Video File - # result = pipeline.process("https://drive.usercontent.google.com/u/2/uc?id=1BJ-0fvbc0mlDWaBGci0Ma-f1k6iElh6v", 0, 10000) - # print("result",result) - - # # Invalid URL Video - # result = pipeline.process("https://invalid-url.com/video.mp4", 0, 10000) - # print("result",result) - - # # Local Video File Path - # result = pipeline.process("./samples/sample_0.mp4", 0, 10000) - # print("result",result) - - # # Invalid Video File Path - # result = pipeline.process("./samples/non-exist.mp4", 0, 10000) - # print("result",result) - - # # Local Audio File Path - # result = pipeline.process("./samples/sample_1.mp3", 0, 10000) - # print("result",result) - - -# # Run: -# # python -m app.services.audio_transcription_sentiment_pipeline \ No newline at end of file +import os + +from app.config import Config + +from app.utils.logger import logger + +# Services +from app.services.audio_service import AudioService +from app.services.transcript_service import TranscriptService +from app.services.sentiment_service import SentimentService + +config = Config().config # Load the configuration + + +from pydantic import BaseModel +from typing import List, Union + +class TranscriptionChunk(BaseModel): + timestamp: List[int] # [start_time_ms, end_time_ms] + text: str # Text from the chunk + label: Union[str, None] = None # Sentiment label (optional) + confidence: Union[float, None] = None # Sentiment confidence score (optional) + +class AudioTranscriptionSentimentResult(BaseModel): + audio_path: str # Path to the extracted audio segment + start_time_ms: int # Start time of the segment (in milliseconds) + end_time_ms: int # End time of the segment (in milliseconds) + transcription: str # Full transcription of the audio segment + utterances_sentiment: List[TranscriptionChunk] # Sentiment analysis for each chunk + +class ErrorResponse(BaseModel): + error: str # Error message describing what went wrong + +# Union type to handle both successful and error responses +ProcessResponse = Union[AudioTranscriptionSentimentResult, ErrorResponse] + + +class AudioTranscriptionSentimentPipeline: + def __init__(self): + self.debug = config.get('debug') + + self.config = config.get('audio_transcription_sentiment_pipeline') + self.remove_audio = self.config.get('remove_audio') + + self.audio_service = AudioService() + self.transcript_service = TranscriptService() + self.sentiment_service = SentimentService() + + def process(self, url: str, start_time_ms: int, end_time_ms: int = None, user_id: str = None) -> ProcessResponse: + """ + Process the Video/Audio file by extracting a segment, transcribing it, and performing sentiment analysis. + :param url: URL or local file path to the audio file. + :param start_time_ms: Start time of the segment to extract (in milliseconds). + :param end_time_ms: End time of the segment to extract (in milliseconds). + :param user_id: (Optional) User ID for creating user-specific subdirectories + :return: Transcription, sentiment analysis, and audio segment details + """ + try: + # Step(1) Extract the audio segment + audio_result = self.audio_service.extract_audio(url, start_time_ms, end_time_ms, user_id) + + if isinstance(audio_result, dict) and 'error' in audio_result: + return { + 'error': audio_result["error"] + } + + if self.debug: + logger.debug( + "[Service Layer] [AudioTranscriptionSentimentPipeline] [process] audio_result: %s", + audio_result + ) + + # Parse the audio segment details + audio_path = audio_result['audio_path'] + start_time_ms = audio_result['start_time_ms'] + end_time_ms = audio_result['end_time_ms'] + + # Step(2) Transcribe the audio segment + transcription_result = self.transcript_service.transcribe(audio_path) + + if isinstance(transcription_result, dict) and 'error' in transcription_result: + return { + 'error': transcription_result['error'] + } + + if self.debug: + logger.debug( + "[Service Layer] [AudioTranscriptionSentimentPipeline] [process] transcription_result: %s", + transcription_result + ) + + # Parse the transcription details + transcription = transcription_result['transcription'] + chunks = transcription_result['chunks'] + + # Remove the audio file after processing + if self.remove_audio: + logger.debug( + "[Service Layer] [AudioTranscriptionSentimentPipeline] [process] Removing audio file: %s", + audio_path + ) + os.remove(audio_path) + + # Step(3) Perform sentiment analysis per chunk + for chunk in chunks: + timestamp = chunk['timestamp'] + text = chunk['text'] + + sentiment_result = self.sentiment_service.analyze(text) + if isinstance(sentiment_result, dict) and 'error' in sentiment_result: + logger.error( + "[Service Layer] [AudioTranscriptionSentimentPipeline] [process] sentiment error: %s", + sentiment_result + ) + chunk['error'] = sentiment_result['error'] + continue + + if self.debug: + logger.debug( + "[Service Layer] [AudioTranscriptionSentimentPipeline] [process] sentiment_result: %s", + sentiment_result + ) + + chunk['label'] = sentiment_result['label'] + chunk['confidence'] = sentiment_result['confidence'] + + return { + 'audio_path': audio_path, + 'start_time_ms': start_time_ms, + 'end_time_ms': end_time_ms, + 'transcription': transcription, + 'utterances_sentiment': chunks, + } + except Exception as e: + logger.error( + "[Service Layer] [AudioTranscriptionSentimentPipeline] [process] An error occurred: %s", + str(e) + ) + return {'error': 'An unexpected error occurred while processing the request.'} diff --git a/tests/unit/test_services/test_audio_transcription_sentiment_pipeline.py b/tests/unit/test_services/test_audio_transcription_sentiment_pipeline.py index 4fe8017..3abc67b 100644 --- a/tests/unit/test_services/test_audio_transcription_sentiment_pipeline.py +++ b/tests/unit/test_services/test_audio_transcription_sentiment_pipeline.py @@ -1,309 +1,307 @@ -""" -This Module contains the unit tests for the AudioTranscriptionSentimentPipeline class. -""" - -import pytest -from unittest.mock import MagicMock, patch - -# Service to be tested -from app.services.audio_transcription_sentiment_pipeline import AudioTranscriptionSentimentPipeline - -class TestAudioTranscriptionSentimentPipeline: - @pytest.fixture - def audio_transcription_sentiment_pipeline(self): - """ - Fixture to set up AudioTranscriptionSentimentPipeline instance for testing. - """ - pipeline = AudioTranscriptionSentimentPipeline() - - # Override the remove_audio attribute to prevent deletion of audio files - pipeline.remove_audio = False - return pipeline - - # Grouped tests for the `process` method - class TestProcess: - def setup_method(self): - """Set up before each test for Process class.""" - self.args = { - "url": "https://example.com/audio.mp3", - "start_time_ms": 10, - "end_time_ms": 20, - "user_id": "user123" - } - - # Mocking Services - @pytest.fixture - def mock_audio_service__extract_audio(self): - """ - Fixture to mock the AudioService class. - """ - with patch("app.services.audio_transcription_sentiment_pipeline.AudioService.extract_audio") as mock_audio_service__extract_audio: - yield mock_audio_service__extract_audio - - @pytest.fixture - def mock_transcript_service__transcribe(self): - """ - Fixture to mock the TranscriptService class. - """ - with patch("app.services.audio_transcription_sentiment_pipeline.TranscriptService.transcribe") as mock_transcript_service__transcribe: - yield mock_transcript_service__transcribe - - @pytest.fixture - def mock_sentiment_service__analyze(self): - """ - Fixture to mock the SentimentService class. - """ - with patch("app.services.audio_transcription_sentiment_pipeline.SentimentService.analyze") as mock_sentiment_service__analyze: - yield mock_sentiment_service__analyze - - @pytest.fixture - def mock_os__remove(self): - """ - Fixture to mock the os.remove method. - """ - with patch("os.remove") as mock_os__remove: - - # Override the remove_audio attribute to prevent deletion of audio files - mock_os__remove.return_value = True - yield mock_os__remove - - - def test_process__extract_audio_failure(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio): - """ - Test the process method when the extract_audio method fails. - """ - payload = self.args.copy() - # Setup - mock_audio_service__extract_audio.return_value = { - "error": "Mocked error message" - } - - # Run - result = audio_transcription_sentiment_pipeline.process(**payload) - - # Assert - assert result == { - 'error': "Mocked error message" - } - mock_audio_service__extract_audio.assert_called_once_with(payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id']) - - - def test_process__extract_audio_exception(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio): - """ - Test the process method when the extract_audio method raises an exception. - """ - payload = self.args.copy() - # Setup - mock_audio_service__extract_audio.side_effect = Exception("Mocked exception") - - # Run - result = audio_transcription_sentiment_pipeline.process(**payload) - - # Assert - assert result == { - 'error': "An unexpected error occurred while processing the request." - } - mock_audio_service__extract_audio.assert_called_once_with(payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id']) - - - def test_process__transcribe_audio_failure(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio, mock_transcript_service__transcribe): - """ - Test the process method when the transcribe method fails. - """ - payload = self.args.copy() - # Setup - mock_audio_service__extract_audio.return_value = { - "audio_path": "/path/to/audio.mp3", - "start_time_ms": 10, - "end_time_ms": 20 - } - mock_transcript_service__transcribe.return_value = { - "error": "Mocked error message" - } - - # Run - result = audio_transcription_sentiment_pipeline.process(**self.args) - - # Assert - assert result == { - 'error': "Mocked error message" - } - mock_audio_service__extract_audio.assert_called_once_with(payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id']) - mock_transcript_service__transcribe.assert_called_once_with("/path/to/audio.mp3") - - def test_process__transcribe_audio_exception(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio, mock_transcript_service__transcribe): - """ - Test the process method when the transcribe method raises an exception. - """ - payload = self.args.copy() - # Setup - mock_audio_service__extract_audio.return_value = { - "audio_path": "/path/to/audio.mp3", - "start_time_ms": 10, - "end_time_ms": 20 - } - mock_transcript_service__transcribe.side_effect = Exception("Mocked exception") - - # Run - result = audio_transcription_sentiment_pipeline.process(**payload) - - # Assert - assert result == { - 'error': "An unexpected error occurred while processing the request." - } - mock_audio_service__extract_audio.assert_called_once_with(payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id']) - mock_transcript_service__transcribe.assert_called_once_with("/path/to/audio.mp3") - - - def test_process__sentiment_analysis_failure( - self, - audio_transcription_sentiment_pipeline, - mock_audio_service__extract_audio, - mock_transcript_service__transcribe, - mock_sentiment_service__analyze - ): - """ - Test the process method when the sentiment analysis service fails for one or more chunks. - """ - payload = self.args.copy() - - # Mock extract_audio success - mock_audio_service__extract_audio.return_value = { - "audio_path": "/path/to/audio.mp3", - "start_time_ms": 10, - "end_time_ms": 20 - } - - # Mock transcribe success with multiple chunks - mock_transcript_service__transcribe.return_value = { - "transcription": "This is a test transcription.", - "chunks": [ - {"timestamp": [10, 15], "text": "First chunk"}, - {"timestamp": [15, 20], "text": "Second chunk"} - ] - } - - # Mock sentiment analysis failure for one chunk - mock_sentiment_service__analyze.side_effect = [ - {"label": "POS", "confidence": 0.9}, # First chunk succeeds - {"error": "Mocked sentiment analysis failure"} # Second chunk fails - ] - - # Run - result = audio_transcription_sentiment_pipeline.process(**payload) - - # Assert - assert result == { - 'audio_path': '/path/to/audio.mp3', - 'start_time_ms': 10, - 'end_time_ms': 20, - 'transcription': 'This is a test transcription.', - 'utterances_sentiment': [ - {'timestamp': [10, 15], 'text': 'First chunk', 'label': 'POS', 'confidence': 0.9}, - {'timestamp': [15, 20], 'text': 'Second chunk', 'error': 'Mocked sentiment analysis failure'} - ] - } - - def test_process__sentiment_analysis_exception( - self, - audio_transcription_sentiment_pipeline, - mock_audio_service__extract_audio, - mock_transcript_service__transcribe, - mock_sentiment_service__analyze - ): - """ - Test the process method when the sentiment analysis service raises an exception. - """ - payload = self.args.copy() - - # Mock extract_audio success - mock_audio_service__extract_audio.return_value = { - "audio_path": "/path/to/audio.mp3", - "start_time_ms": 10, - "end_time_ms": 20 - } - - # Mock transcribe success with multiple chunks - mock_transcript_service__transcribe.return_value = { - "transcription": "This is a test transcription.", - "chunks": [ - {"timestamp": [10, 15], "text": "First chunk"}, - {"timestamp": [15, 20], "text": "Second chunk"} - ] - } - - # Mock sentiment analysis failure for one chunk - mock_sentiment_service__analyze.side_effect = [ - Exception("Mocked sentiment analysis exception"), - {"label": "POS", "confidence": 0.9} # Second chunk succeeds - ] - - # Run - result = audio_transcription_sentiment_pipeline.process(**payload) - - # Assert - assert result == { - 'error': 'An unexpected error occurred while processing the request.' - } - assert mock_audio_service__extract_audio.called_once_with(payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id']) - assert mock_transcript_service__transcribe.called_once_with("/path/to/audio.mp3") - assert mock_sentiment_service__analyze.call_once_with("First chunk") - - - def test_process_success( - self, - audio_transcription_sentiment_pipeline, - mock_audio_service__extract_audio, - mock_transcript_service__transcribe, - mock_sentiment_service__analyze, - mock_os__remove - ): - """ - Test the process method when all services succeed. - """ - payload = self.args.copy() - - # Mock extract_audio success - mock_audio_service__extract_audio.return_value = { - "audio_path": "/path/to/audio.mp3", - "start_time_ms": 10, - "end_time_ms": 20 - } - - # Mock transcribe success with multiple chunks - mock_transcript_service__transcribe.return_value = { - "transcription": "This is a test transcription.", - "chunks": [ - {"timestamp": [10, 15], "text": "First chunk"}, - {"timestamp": [15, 20], "text": "Second chunk"} - ] - } - - # Mock sentiment analysis success for all chunks - mock_sentiment_service__analyze.side_effect = [ - {"label": "POS", "confidence": 0.9}, - {"label": "NEG", "confidence": 0.8} - ] - - # Run - result = audio_transcription_sentiment_pipeline.process(**payload) - - # Assert - assert result == { - 'audio_path': '/path/to/audio.mp3', - 'start_time_ms': 10, - 'end_time_ms': 20, - 'transcription': 'This is a test transcription.', - 'utterances_sentiment': [ - {'timestamp': [10, 15], 'text': 'First chunk', 'label': 'POS', 'confidence': 0.9}, - {'timestamp': [15, 20], 'text': 'Second chunk', 'label': 'NEG', 'confidence': 0.8} - ] - } - assert mock_audio_service__extract_audio.called_once_with(payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id']) - assert mock_transcript_service__transcribe.called_once_with("/path/to/audio.mp3") - assert mock_sentiment_service__analyze.call_count == 2 - assert mock_os__remove.called == False - - -# # Run: -# coverage run -m pytest .\tests\unit\test_services\test_audio_transcription_sentiment_pipeline.py \ No newline at end of file +""" +This Module contains the unit tests for the AudioTranscriptionSentimentPipeline class. +""" + +import pytest +from unittest.mock import MagicMock, patch + +# Service to be tested +from app.services.audio_transcription_sentiment_pipeline import AudioTranscriptionSentimentPipeline + +class TestAudioTranscriptionSentimentPipeline: + @pytest.fixture + def audio_transcription_sentiment_pipeline(self): + """ + Fixture to set up AudioTranscriptionSentimentPipeline instance for testing. + remove_audio is False by default — tests that require True set it explicitly. + """ + pipeline = AudioTranscriptionSentimentPipeline() + pipeline.remove_audio = False + return pipeline + + # Grouped tests for the `process` method + class TestProcess: + def setup_method(self): + """Set up before each test for Process class.""" + self.args = { + "url": "https://example.com/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20, + "user_id": "user123" + } + + # Mocking Services + @pytest.fixture + def mock_audio_service__extract_audio(self): + with patch("app.services.audio_transcription_sentiment_pipeline.AudioService.extract_audio") as mock: + yield mock + + @pytest.fixture + def mock_transcript_service__transcribe(self): + with patch("app.services.audio_transcription_sentiment_pipeline.TranscriptService.transcribe") as mock: + yield mock + + @pytest.fixture + def mock_sentiment_service__analyze(self): + with patch("app.services.audio_transcription_sentiment_pipeline.SentimentService.analyze") as mock: + yield mock + + @pytest.fixture + def mock_os__remove(self): + with patch("app.services.audio_transcription_sentiment_pipeline.os.remove") as mock: + mock.return_value = True + yield mock + + # --- Existing tests (unchanged) --- + + def test_process__extract_audio_failure(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio): + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = {"error": "Mocked error message"} + + result = audio_transcription_sentiment_pipeline.process(**payload) + + assert result == {'error': "Mocked error message"} + mock_audio_service__extract_audio.assert_called_once_with( + payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id'] + ) + + def test_process__extract_audio_exception(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio): + payload = self.args.copy() + mock_audio_service__extract_audio.side_effect = Exception("Mocked exception") + + result = audio_transcription_sentiment_pipeline.process(**payload) + + assert result == {'error': "An unexpected error occurred while processing the request."} + mock_audio_service__extract_audio.assert_called_once_with( + payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id'] + ) + + def test_process__transcribe_audio_failure(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio, mock_transcript_service__transcribe): + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = { + "audio_path": "/path/to/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20 + } + mock_transcript_service__transcribe.return_value = {"error": "Mocked error message"} + + result = audio_transcription_sentiment_pipeline.process(**self.args) + + assert result == {'error': "Mocked error message"} + mock_audio_service__extract_audio.assert_called_once_with( + payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id'] + ) + mock_transcript_service__transcribe.assert_called_once_with("/path/to/audio.mp3") + + def test_process__transcribe_audio_exception(self, audio_transcription_sentiment_pipeline, mock_audio_service__extract_audio, mock_transcript_service__transcribe): + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = { + "audio_path": "/path/to/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20 + } + mock_transcript_service__transcribe.side_effect = Exception("Mocked exception") + + result = audio_transcription_sentiment_pipeline.process(**payload) + + assert result == {'error': "An unexpected error occurred while processing the request."} + mock_audio_service__extract_audio.assert_called_once_with( + payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id'] + ) + mock_transcript_service__transcribe.assert_called_once_with("/path/to/audio.mp3") + + def test_process__sentiment_analysis_failure( + self, + audio_transcription_sentiment_pipeline, + mock_audio_service__extract_audio, + mock_transcript_service__transcribe, + mock_sentiment_service__analyze + ): + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = { + "audio_path": "/path/to/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20 + } + mock_transcript_service__transcribe.return_value = { + "transcription": "This is a test transcription.", + "chunks": [ + {"timestamp": [10, 15], "text": "First chunk"}, + {"timestamp": [15, 20], "text": "Second chunk"} + ] + } + mock_sentiment_service__analyze.side_effect = [ + {"label": "POS", "confidence": 0.9}, + {"error": "Mocked sentiment analysis failure"} + ] + + result = audio_transcription_sentiment_pipeline.process(**payload) + + assert result == { + 'audio_path': '/path/to/audio.mp3', + 'start_time_ms': 10, + 'end_time_ms': 20, + 'transcription': 'This is a test transcription.', + 'utterances_sentiment': [ + {'timestamp': [10, 15], 'text': 'First chunk', 'label': 'POS', 'confidence': 0.9}, + {'timestamp': [15, 20], 'text': 'Second chunk', 'error': 'Mocked sentiment analysis failure'} + ] + } + + def test_process__sentiment_analysis_exception( + self, + audio_transcription_sentiment_pipeline, + mock_audio_service__extract_audio, + mock_transcript_service__transcribe, + mock_sentiment_service__analyze + ): + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = { + "audio_path": "/path/to/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20 + } + mock_transcript_service__transcribe.return_value = { + "transcription": "This is a test transcription.", + "chunks": [ + {"timestamp": [10, 15], "text": "First chunk"}, + {"timestamp": [15, 20], "text": "Second chunk"} + ] + } + mock_sentiment_service__analyze.side_effect = [ + Exception("Mocked sentiment analysis exception"), + {"label": "POS", "confidence": 0.9} + ] + + result = audio_transcription_sentiment_pipeline.process(**payload) + + assert result == {'error': 'An unexpected error occurred while processing the request.'} + assert mock_audio_service__extract_audio.called_once_with( + payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id'] + ) + assert mock_transcript_service__transcribe.called_once_with("/path/to/audio.mp3") + assert mock_sentiment_service__analyze.call_once_with("First chunk") + + def test_process_success( + self, + audio_transcription_sentiment_pipeline, + mock_audio_service__extract_audio, + mock_transcript_service__transcribe, + mock_sentiment_service__analyze, + mock_os__remove + ): + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = { + "audio_path": "/path/to/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20 + } + mock_transcript_service__transcribe.return_value = { + "transcription": "This is a test transcription.", + "chunks": [ + {"timestamp": [10, 15], "text": "First chunk"}, + {"timestamp": [15, 20], "text": "Second chunk"} + ] + } + mock_sentiment_service__analyze.side_effect = [ + {"label": "POS", "confidence": 0.9}, + {"label": "NEG", "confidence": 0.8} + ] + + result = audio_transcription_sentiment_pipeline.process(**payload) + + assert result == { + 'audio_path': '/path/to/audio.mp3', + 'start_time_ms': 10, + 'end_time_ms': 20, + 'transcription': 'This is a test transcription.', + 'utterances_sentiment': [ + {'timestamp': [10, 15], 'text': 'First chunk', 'label': 'POS', 'confidence': 0.9}, + {'timestamp': [15, 20], 'text': 'Second chunk', 'label': 'NEG', 'confidence': 0.8} + ] + } + assert mock_audio_service__extract_audio.called_once_with( + payload['url'], payload['start_time_ms'], payload['end_time_ms'], payload['user_id'] + ) + assert mock_transcript_service__transcribe.called_once_with("/path/to/audio.mp3") + assert mock_sentiment_service__analyze.call_count == 2 + assert mock_os__remove.called == False + + # --- NEW: remove_audio=True coverage --- + + def test_process__remove_audio_called_when_enabled( + self, + audio_transcription_sentiment_pipeline, + mock_audio_service__extract_audio, + mock_transcript_service__transcribe, + mock_sentiment_service__analyze, + mock_os__remove + ): + """ + Test that os.remove() is called with the correct audio path + when remove_audio is set to True. + Previously untested — the shared fixture hardcodes remove_audio=False. + """ + # Enable remove_audio for this test only + audio_transcription_sentiment_pipeline.remove_audio = True + + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = { + "audio_path": "/path/to/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20 + } + mock_transcript_service__transcribe.return_value = { + "transcription": "This is a test transcription.", + "chunks": [ + {"timestamp": [10, 15], "text": "First chunk"} + ] + } + mock_sentiment_service__analyze.return_value = {"label": "POS", "confidence": 0.9} + + result = audio_transcription_sentiment_pipeline.process(**payload) + + # os.remove must be called exactly once with the audio path + mock_os__remove.assert_called_once_with("/path/to/audio.mp3") + + assert result == { + 'audio_path': '/path/to/audio.mp3', + 'start_time_ms': 10, + 'end_time_ms': 20, + 'transcription': 'This is a test transcription.', + 'utterances_sentiment': [ + {'timestamp': [10, 15], 'text': 'First chunk', 'label': 'POS', 'confidence': 0.9} + ] + } + + def test_process__remove_audio_not_called_when_disabled( + self, + audio_transcription_sentiment_pipeline, + mock_audio_service__extract_audio, + mock_transcript_service__transcribe, + mock_sentiment_service__analyze, + mock_os__remove + ): + """ + Test that os.remove() is NOT called when remove_audio is False. + Complements test_process__remove_audio_called_when_enabled. + """ + audio_transcription_sentiment_pipeline.remove_audio = False + + payload = self.args.copy() + mock_audio_service__extract_audio.return_value = { + "audio_path": "/path/to/audio.mp3", + "start_time_ms": 10, + "end_time_ms": 20 + } + mock_transcript_service__transcribe.return_value = { + "transcription": "This is a test transcription.", + "chunks": [ + {"timestamp": [10, 15], "text": "First chunk"} + ] + } + mock_sentiment_service__analyze.return_value = {"label": "NEU", "confidence": 0.7} + + audio_transcription_sentiment_pipeline.process(**payload) + + mock_os__remove.assert_not_called()