Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions app/data/sentiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,29 @@ def __init__(self, config: dict):
else:
raise ValueError(f"Unsupported sentiment analysis model: {self.default_model}")

def analyze(self, text: str) -> tuple:
def analyze(self, texts: str) -> list:
"""
Perform sentiment analysis on the given text.
:param text: Input text for sentiment analysis.
:return: Model outputs, probabilities, predicted label, and confidence score.
Perform sentiment analysis on the given text or list of texts.

:param text: Input text or list of texts for sentiment analysis.
:return: Dictionary for single input or list of dictionaries for batch input.
"""
try:
outputs, probabilities, predicted_label, confidence = self.model(text)
return {
# 'outputs': outputs,
# 'probabilities': probabilities,
'label': predicted_label,
'confidence': confidence
}

batch_results = self.model(texts)

results = []
for res in batch_results:
results.append({
'label': res['label'],
'confidence': round(float(res['confidence']), 2)
})


if isinstance(texts, str):
return results[0]

return results

except Exception as e:
logger.error(f"[error] [Data Layer] [SentimentDataLayer] [analyze] An error occurred during sentiment analysis: {str(e)}")
# print(f"[error] [Data Layer] [SentimentDataLayer] [analyze] An error occurred during sentiment analysis: {str(e)}")
Expand All @@ -57,13 +65,20 @@ def analyze(self, text: str) -> tuple:
# }
# }
# }
# print("config",config)
# sentiment_data = SentimentDataLayer(config)
# print("sentiment_data",sentiment_data)

# print(sentiment_data.analyze("I love this product!"))
# print(sentiment_data.analyze("I hate this product!"))
# print(sentiment_data.analyze("I am neutral about this product."))
# test_batch = [
# "I love this product!",
# "I hate this product!",
# "I am neutral about this product."
# ]

# print("\n--- Testing Batch Inference ---")
# results = sentiment_data.analyze(test_batch)
# print(results)

# # Run:
# # python -m app.data.sentiment_data
# print("\n--- Testing Single Inference ---")
# result = sentiment_data.analyze("I love this product!")
# print(result)
# Run:
# python -m app.data.sentiment_data
39 changes: 28 additions & 11 deletions app/models/bertweet_model.py
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did u modify in this file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my understanding of the issue, I modified bertweet_model.py to implement Tensor-based batching directly within the model's forward pass.

Handling batches at the model level (using padding=True and truncation=True) leverages PyTorch's parallelism, making inference significantly faster than looping over strings. I've also ensured the method remains backward-compatible, so it still works perfectly for single strings (as it used to) while supporting lists of strings.

Regarding the logs, I've removed the initial debug logs to keep the PR focused and clean.

Screenshot 2026-03-21 051338

Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,42 @@ def __init__(self,config: dict)->None:
else:
self.class_labels = None

def forward(self,text)->tuple:
def forward(self,texts)->list:
"""
Perform sentiment analysis on the given text.

Args:
text (str): Input text for sentiment analysis.
texts (str): Input text for sentiment analysis.

Returns:
tuple: Model outputs, probabilities, predicted label, and confidence score.
list: A list of dictionaries containing text, label, and confidence score.
"""
# Tokenize the input text
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(self.device)

# Handle single string input for consistency
if isinstance(texts, str):
texts = [texts]

# Process as batch with padding and truncation (The Core Fix)
inputs = self.tokenizer(texts, return_tensors="pt", truncation=True, padding=True).to(self.device)

# Forward pass
outputs = self.model(**inputs)

# Convert logits to probabilities
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
# Get the highest probability and its index
confidences, predicted_classes = torch.max(probabilities, dim=1)

# Get the predicted sentiment
predicted_class = torch.argmax(probabilities, dim=1).item()
results = []
for i in range(len(texts)):
label = self.class_labels[predicted_classes[i].item()]
results.append({
"text": texts[i],
"label": label,
"confidence": confidences[i].item()
})

# Get the corresponding class label
predicted_label = self.class_labels[predicted_class]

return outputs, probabilities, predicted_label, probabilities[0][predicted_class].item()
return results


# if __name__ == "__main__":
Expand Down Expand Up @@ -88,5 +98,12 @@ def forward(self,text)->tuple:
# text = "Hi how are u?"
# print(model(text))

# test_texts = [
# "I love the new features of the app!",
# "I hate the new features of the app!",
# "Hi how are u?"
# ]

# print(model(test_texts))
# # Run:
# # python -m app.models.bertweet_model
62 changes: 61 additions & 1 deletion app/routes/sentiment_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ def register_routes(api):
})) # Embed the data model
})

# ── Models for /analyze/batch ──────────────────────────────────────────────
sentiment_analyze_batch_request_model = api.model('SentimentAnalyzeBatchRequestModel', {
'texts': fields.List(fields.String, required=True, description='List of texts for sentiment analysis.', example=['I love this!', 'I hate this!'])
})

sentiment_analyze_batch_success_model = api.model('SentimentAnalyzeBatchSuccessModel', {
'status': fields.String(required=True, description='The status of the response', example='success'),
'data': fields.List(fields.Nested(api.model('SentimentAnalyzeBatchItemModel', {
'label': fields.String(required=True, description='Predicted sentiment label.', enum=['POS', 'NEG', 'NEU'], example='POS'),
'confidence': fields.Float(required=True, description='Confidence score of the prediction.', example=0.95)
})))
})

# Define the endpoint for the Analyze sentiment of a text.
@api.route('/analyze')
class SentimentAnalyze(Resource):
Expand Down Expand Up @@ -88,7 +101,54 @@ def post(self):
# print(f"[error] [Route Layer] [SentimentAnalyze] [post] An error occurred: {str(e)}")
return {
'status': 'error',
"error": 'An unexpected error occurred while processing the request.', # Generic error message
"error": 'An unexpected error occurred while processing the request.',
'data': None
}, 500

# Define the endpoint for batch sentiment analysis.
@api.route('/analyze/batch')
class SentimentAnalyzeBatch(Resource):
@api.doc(description="Analyze sentiment of a batch of texts.")
@api.expect(sentiment_analyze_batch_request_model)
@api.response(200, 'Success', sentiment_analyze_batch_success_model)
@api.response(400, 'Bad Request', sentiment_analyze_bad_request_model)
@api.response(500, 'Internal Server Error', sentiment_analyze_internal_server_error_model)
def post(self):
"""
Endpoint to analyze sentiment of a batch of texts.
- texts (list[str]): List of input texts for sentiment analysis.
"""
try:
data = request.json

texts = data.get('texts')

if not texts or not isinstance(texts, list) or len(texts) == 0:
return {
'status': 'error',
'error': 'texts is required and must be a non-empty list.',
'data': None
}, 400

results = service.analyze(texts)

if isinstance(results, dict) and 'error' in results:
return {
'status': 'error',
'error': results['error'],
'data': None
}, 500

return {
'status': 'success',
'data': results
}

except Exception as e:
logger.error(f"[error] [Route Layer] [SentimentAnalyzeBatch] [post] An error occurred: {str(e)}")
return {
'status': 'error',
'error': 'An unexpected error occurred while processing the request.',
'data': None
}, 500 # Internal Server Error

Expand Down
89 changes: 49 additions & 40 deletions app/services/audio_transcription_sentiment_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,32 @@ def process(self, url: str, start_time_ms: int, end_time_ms: int = None, user_id
os.remove(audio_path)


# Step(3) Perform sentiment [Per chunk :D]
for chunk in chunks:
timestamp = chunk['timestamp']
text = chunk['text']

sentiment_result = self.sentiment_service.analyze(text)
if isinstance(sentiment_result, dict) and 'error' in sentiment_result:
logger.error(f"[error] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result)
# print("[error] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result)
chunk['error'] = sentiment_result['error'] # Add the error message to the chunk
continue # Skip this chunk if there was an error :D

# Step(3) Perform sentiment analysis [Batch Processing]
try:
# Extract all text chunks into a list for batch processing
texts_to_analyze = [chunk['text'] for chunk in chunks]

if texts_to_analyze:
# Perform batch inference
batch_results = self.sentiment_service.analyze(texts_to_analyze)

if isinstance(batch_results, dict) and 'error' in batch_results:
logger.error(f"[error] [Service Layer] [Pipeline] Batch sentiment analysis failed: {batch_results['error']}")
# Handle the error by marking chunks if necessary
else:
# Map the results back to each chunk
for i, result in enumerate(batch_results):
chunks[i]['label'] = result.get('label')
chunks[i]['confidence'] = result.get('confidence')

if self.debug:
logger.debug("[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result)
# print("[debug] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] [sentiment_result]", sentiment_result)
logger.debug(f"[debug] [Service Layer] [Pipeline] Processed {len(chunks)} chunks using batching.")

# Add the sentiment result to the chunk
chunk['label'] = sentiment_result['label']
chunk['confidence'] = sentiment_result['confidence']
except Exception as e:
logger.error(f"[error] [Service Layer] [AudioTranscriptionSentimentPipeline] [process] Sentiment batching failed: {str(e)}")
# Optional: fallback to the original chunks without sentiment if critical

# Return the transcription, sentiment analysis, and audio segment details
# Return the full result
return {
'audio_path': audio_path,
'start_time_ms': start_time_ms,
Expand All @@ -134,29 +139,33 @@ def process(self, url: str, start_time_ms: int, end_time_ms: int = None, user_id


# if __name__ == "__main__":
# # Initialize the pipeline
# pipeline = AudioTranscriptionSentimentPipeline()
# print("pipeline",pipeline)

# # URL to Video File
# result = pipeline.process("https://drive.usercontent.google.com/u/2/uc?id=1BJ-0fvbc0mlDWaBGci0Ma-f1k6iElh6v", 0, 10000)
# print("result",result)

# # Invalid URL Video
# result = pipeline.process("https://invalid-url.com/video.mp4", 0, 10000)
# print("result",result)

# # Local Video File Path
# result = pipeline.process("./samples/sample_0.mp4", 0, 10000)
# print("result",result)

# # Invalid Video File Path
# result = pipeline.process("./samples/non-exist.mp4", 0, 10000)
# print("result",result)

# # Local Audio File Path
# result = pipeline.process("./samples/sample_1.mp3", 0, 10000)
# print("result",result)

# print(f"Pipeline initialized: {pipeline}")

# # Test with a local video or audio file (Make sure the path exists)
# sample_path = "./samples/sample_0.mp4"

# if os.path.exists(sample_path):
# print(f"\n--- Processing Sample: {sample_path} ---")
# # Extract and analyze the first 30 seconds
# result = pipeline.process(sample_path, start_time_ms=0, end_time_ms=30000)

# if 'error' in result:
# print(f"Error occurred: {result['error']}")
# else:
# print(f"Successfully processed audio: {result['audio_path']}")
# print(f"Full Transcription: {result['transcription'][:100]}...") # Print first 100 chars

# # Check the batch results
# utterances = result.get('utterances_sentiment', [])
# print(f"Number of chunks analyzed: {len(utterances)}")

# # Print the first few results to verify
# for i, chunk in enumerate(utterances[:3]):
# print(f"Chunk {i}: Text: '{chunk['text'][:30]}...' -> Label: {chunk.get('label')}, Confidence: {chunk.get('confidence')}")
# else:
# print(f"\n[Note] Sample file not found at {sample_path}. Skip local test.")

# # Run:
# # python -m app.services.audio_transcription_sentiment_pipeline
41 changes: 25 additions & 16 deletions app/services/sentiment_service.py
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same why did u changed in this file ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified the Service Layer to ensure compatibility with the new batch processing logic in the model. Since the model now returns a list of dictionaries instead of a single result, the Service Layer needed to be Type-Aware."

I implemented two main things there:

Handling Polymorphism: I used isinstance() to check if the input is a single string or a list. This keeps the API backward compatible, meaning it still works perfectly for single requests.

Standardizing the Response by using a list comprehension to call format_response, I ensured that the output format remains consistent and clean, whether the user sends one sentence or a many.

image

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

# Data Layer for fetching and processing transcripts
from app.data.sentiment_data import SentimentDataLayer
from typing import Union

config = Config().config # Load the configuration

Expand All @@ -16,22 +17,25 @@ def __init__(self):

self.sentiment_data_layer = SentimentDataLayer(config)

def analyze(self, text: str) -> dict:
def analyze(self, texts: Union[str, list]) -> Union[dict, list]:
"""
Perform sentiment analysis on the given text.
:param text: Input text for sentiment analysis.
Perform sentiment analysis on the given text or list of texts.
:param texts: Input text or list of texts for sentiment analysis.
:return: predicted label, and confidence score.
"""
try:
result = self.sentiment_data_layer.analyze(text)
results = self.sentiment_data_layer.analyze(texts)

if isinstance(result, dict) and 'error' in result:
if isinstance(results, dict) and 'error' in results:
return {
'error': result['error']
'error': results['error']
}

# Return the predicted label and confidence score
return self.format_response(result)
if isinstance(texts, str):
return self.format_response(results)

# Batch processing: format each result in the list
return [self.format_response(res) for res in results]

except Exception as e:
logger.error(f"[error] [Service Layer] [SentimentService] [analyze] An error occurred during sentiment analysis: {str(e)}")
Expand All @@ -50,14 +54,19 @@ def format_response(self, result: dict) -> dict:
# if __name__ == "__main__":
# sentiment_service = SentimentService()

# result = sentiment_service.analyze("I love this product!")
# print("result",result)
# test_texts = [
# "I love this product!",
# "I hate this product!",
# "I am neutral about this product."
# ]

# result = sentiment_service.analyze("I hate this product!")
# print("result",result)
# print("\n--- Testing Batch Inference ---")
# batch_result = sentiment_service.analyze(test_texts)
# print("Batch Result:", batch_result)

# result = sentiment_service.analyze("I am neutral about this product.")
# print("result",result)
# print("\n--- Testing Single Input ---")
# single_result = sentiment_service.analyze("This is a great day!")
# print("Single Result:", single_result)

# # Run:
# # python -m app.services.sentiment_service
# Run:
# python -m app.services.sentiment_service
Loading