Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops I’ve already cleaned up the .gitignore and reverted those unnecessary changes

Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ logs/
# Ignore Testing Coverage Results
tests/coverage/.coverage

env/
env/
models_cache/
53 changes: 34 additions & 19 deletions app/data/sentiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,29 @@ def __init__(self, config: dict):
else:
raise ValueError(f"Unsupported sentiment analysis model: {self.default_model}")

def analyze(self, text: str) -> tuple:
def analyze(self, texts: str) -> list:
"""
Perform sentiment analysis on the given text.
:param text: Input text for sentiment analysis.
:return: Model outputs, probabilities, predicted label, and confidence score.
Perform sentiment analysis on the given text or list of texts.

:param text: Input text or list of texts for sentiment analysis.
:return: Dictionary for single input or list of dictionaries for batch input.
"""
try:
outputs, probabilities, predicted_label, confidence = self.model(text)
return {
# 'outputs': outputs,
# 'probabilities': probabilities,
'label': predicted_label,
'confidence': confidence
}

batch_results = self.model(texts)

results = []
for res in batch_results:
results.append({
'label': res['label'],
'confidence': res['confidence']
})


if isinstance(texts, str):
return results[0]

return results

except Exception as e:
logger.error(f"[error] [Data Layer] [SentimentDataLayer] [analyze] An error occurred during sentiment analysis: {str(e)}")
# print(f"[error] [Data Layer] [SentimentDataLayer] [analyze] An error occurred during sentiment analysis: {str(e)}")
Expand All @@ -57,13 +65,20 @@ def analyze(self, text: str) -> tuple:
# }
# }
# }
# print("config",config)
# sentiment_data = SentimentDataLayer(config)
# print("sentiment_data",sentiment_data)

# print(sentiment_data.analyze("I love this product!"))
# print(sentiment_data.analyze("I hate this product!"))
# print(sentiment_data.analyze("I am neutral about this product."))
# test_batch = [
# "I love this product!",
# "I hate this product!",
# "I am neutral about this product."
# ]

# print("\n--- Testing Batch Inference ---")
# results = sentiment_data.analyze(test_batch)
# print(results)

# # Run:
# # python -m app.data.sentiment_data
# print("\n--- Testing Single Inference ---")
# result = sentiment_data.analyze("I love this product!")
# print(result)
# Run:
# python -m app.data.sentiment_data
115 changes: 76 additions & 39 deletions app/models/bertweet_model.py
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did u modify in this file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my understanding of the issue, I modified bertweet_model.py to implement Tensor-based batching directly within the model's forward pass.

Handling batches at the model level (using padding=True and truncation=True) leverages PyTorch's parallelism, making inference significantly faster than looping over strings. I've also ensured the method remains backward-compatible, so it still works perfectly for single strings (as it used to) while supporting lists of strings.

Regarding the logs, I've removed the initial debug logs to keep the PR focused and clean.

Screenshot 2026-03-21 051338

Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,39 @@
"""
import torch
import torch.nn as nn
import logging

from transformers import AutoTokenizer, AutoModelForSequenceClassification

logger = logging.getLogger(__name__)

class BertweetSentiment(nn.Module):
def __init__(self,config: dict)->None:
"""
Initialize the Bertweet model for sentiment analysis.
:param config: The configuration object containing model and device info.
"""
self.debug = config.get('debug')


super(BertweetSentiment, self).__init__()

self.config = config.get('sentiment_analysis').get('bertweet')
self.model_name = self.config.get('model_name')
self.device = self.config.get('device')

super(BertweetSentiment, self).__init__()
# Initialize the Tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
try:
# Initialize the Tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name, do_lower_case=True, clean_up_tokenization_spaces=True)

# Initialize the Model
self.model= AutoModelForSequenceClassification.from_pretrained(self.model_name)
self.model.to(self.device)
# Initialize the Model
self.model= AutoModelForSequenceClassification.from_pretrained(self.model_name)
self.model.to(self.device)

logger.info(f"Successfully loaded model: {self.model_name} on {self.device}")

except Exception as e:
logger.error(f"Failed to load BERTweet model: {str(e)}")
raise e

# Load the model configuration to get class labels
self.model_config = self.model.config
Expand All @@ -35,32 +46,46 @@ def __init__(self,config: dict)->None:
else:
self.class_labels = None

def forward(self,text)->tuple:
def forward(self,texts):
"""
Perform sentiment analysis on the given text.
Perform sentiment analysis on a single text or a list of texts (Batch).

Args:
text (str): Input text for sentiment analysis.
texts (str or list): Input text or list of texts for sentiment analysis.

Returns:
tuple: Model outputs, probabilities, predicted label, and confidence score.
list: A list of dictionaries containing text, label, and confidence score.
"""
# Tokenize the input text
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(self.device)

# Forward pass
outputs = self.model(**inputs)

# Convert logits to probabilities
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)

# Get the predicted sentiment
predicted_class = torch.argmax(probabilities, dim=1).item()

# Get the corresponding class label
predicted_label = self.class_labels[predicted_class]

return outputs, probabilities, predicted_label, probabilities[0][predicted_class].item()
# Handle backward compatibility: wrap single string in a list
if isinstance(texts, str):
texts = [texts]


# Tokenize the input texts with padding and truncation
# Padding ensures all sequences in the batch have the same length
inputs = self.tokenizer(texts, return_tensors="pt", truncation=True, padding=True, max_length=128).to(self.device)

# Disable gradient calculation for efficiency
with torch.no_grad():
# Forward pass
outputs = self.model(**inputs)

# Convert logits to probabilities
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)

# Process results while maintaining the original order
results = []
confidences, predicted_classes = torch.max(probabilities, dim=1)

for i in range(len(texts)):
label = self.class_labels[predicted_classes[i].item()]
results.append({
"text": texts[i],
"label": label,
"confidence": confidences[i].item()
})

return results


# if __name__ == "__main__":
Expand All @@ -74,19 +99,31 @@ def forward(self,text)->tuple:
# }
# }
# }
# print("config",config)
# logger.info(f"Config loaded: {config}")

# model = BertweetSentiment(config)
# print("model",model)
# print("model.class_labels",model.class_labels)

# text = "I love the new features of the app!"
# print(model(text))

# text = "I hate the new features of the app!"
# print(model(text))

# text = "Hi how are u?"
# print(model(text))

# logger.info(f"Model Labels: {model.class_labels}")

# test_texts = [
# "I love the new features of the app!",
# "I hate the new features of the app!",
# "Hi how are u?"
# ]

# logger.info(f"Running batch inference on {len(test_texts)} samples...")
# try:
# results = model(test_texts)

# # Display Results in a clean format
# logger.info("--- Batch Results ---")
# for res in results:
# logger.info(f"Text: {res['text']}")
# logger.info(f"Sentiment: {res['label']} | Confidence: {res['confidence']:.4f}")
# logger.info("-" * 20)

# except Exception as e:
# logger.error(f"An error occurred during testing: {e}")

# # Run:
# # python -m app.models.bertweet_model
65 changes: 62 additions & 3 deletions app/routes/sentiment_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ def register_routes(api):
})) # Embed the data model
})

# ── Models for /analyze/batch ──────────────────────────────────────────────
sentiment_analyze_batch_request_model = api.model('SentimentAnalyzeBatchRequestModel', {
'texts': fields.List(fields.String, required=True, description='List of texts for sentiment analysis.', example=['I love this!', 'I hate this!'])
})

sentiment_analyze_batch_success_model = api.model('SentimentAnalyzeBatchSuccessModel', {
'status': fields.String(required=True, description='The status of the response', example='success'),
'data': fields.List(fields.Nested(api.model('SentimentAnalyzeBatchItemModel', {
'label': fields.String(required=True, description='Predicted sentiment label.', enum=['POS', 'NEG', 'NEU'], example='POS'),
'confidence': fields.Float(required=True, description='Confidence score of the prediction.', example=0.95)
})))
})

# Define the endpoint for the Analyze sentiment of a text.
@api.route('/analyze')
class SentimentAnalyze(Resource):
Expand Down Expand Up @@ -85,12 +98,58 @@ def post(self):

except Exception as e:
logger.error(f"[error] [Route Layer] [SentimentAnalyze] [post] An error occurred: {str(e)}")
# print(f"[error] [Route Layer] [SentimentAnalyze] [post] An error occurred: {str(e)}")
return {
'status': 'error',
"error": 'An unexpected error occurred while processing the request.', # Generic error message
"error": 'An unexpected error occurred while processing the request.',
'data': None
}, 500

# Define the endpoint for batch sentiment analysis.
@api.route('/analyze/batch')
class SentimentAnalyzeBatch(Resource):
@api.doc(description="Analyze sentiment of a batch of texts.")
@api.expect(sentiment_analyze_batch_request_model)
@api.response(200, 'Success', sentiment_analyze_batch_success_model)
@api.response(400, 'Bad Request', sentiment_analyze_bad_request_model)
@api.response(500, 'Internal Server Error', sentiment_analyze_internal_server_error_model)
def post(self):
"""
Endpoint to analyze sentiment of a batch of texts.
- texts (list[str]): List of input texts for sentiment analysis.
"""
try:
data = request.json

texts = data.get('texts')

if not texts or not isinstance(texts, list) or len(texts) == 0:
return {
'status': 'error',
'error': 'texts is required and must be a non-empty list.',
'data': None
}, 400

results = service.analyze(texts)

if isinstance(results, dict) and 'error' in results:
return {
'status': 'error',
'error': results['error'],
'data': None
}, 500

return {
'status': 'success',
'data': results
}

except Exception as e:
logger.error(f"[error] [Route Layer] [SentimentAnalyzeBatch] [post] An error occurred: {str(e)}")
return {
'status': 'error',
'error': 'An unexpected error occurred while processing the request.',
'data': None
}, 500 # Internal Server Error
}, 500

# Define the namespace for the sentiment endpoint
api = Namespace('Sentiment', description='Sentiment Operations')
Expand Down
Loading