@@ -287,9 +344,18 @@ def get_all_predictions(self, predictions):
# --- Sidebar ---
with st.sidebar:
- # --- Dark Mode Toggle ---
+ # --- Initialize Session State ---
if 'dark_mode' not in st.session_state:
st.session_state.dark_mode = False
+ st.session_state.classification_history = []
+ st.session_state.ensemble_history = []
+ st.session_state.model_stats = {}
+ tracker = ecm.ModelPerformanceTracker()
+ st.session_state.ensemble_tracker = tracker
+ st.session_state.ensemble_classifier = ecm.EnsembleSpamClassifier(
+ performance_tracker=tracker
+ )
+ st.session_state.loaded_models = {}
st.markdown("""
Analysis Mode
@@ -358,37 +424,79 @@ def get_all_predictions(self, predictions):
# Sidebar Overall Stats
st.markdown("### ๐ Overall Statistics")
- total_single_predictions = sum(st.session_state.model_stats[model]['total'] for model in MODEL_OPTIONS)
- total_ensemble_predictions = len(st.session_state.ensemble_history)
- total_predictions_overall = total_single_predictions + total_ensemble_predictions
- st.markdown(f"""
-
-
Total Predictions
-
{total_predictions_overall}
-
- """, unsafe_allow_html=True)
+ # Safely calculate statistics with proper error handling
+ try:
+ # Ensure session state is properly initialized
+ if not hasattr(st.session_state, 'model_stats'):
+ st.session_state.model_stats = {model: {'spam': 0, 'ham': 0, 'total': 0} for model in MODEL_OPTIONS.keys()}
+ if not hasattr(st.session_state, 'ensemble_history'):
+ st.session_state.ensemble_history = []
+
+ total_single_predictions = sum(
+ st.session_state.model_stats.get(model, {}).get('total', 0)
+ for model in MODEL_OPTIONS.keys()
+ )
+ total_ensemble_predictions = len(st.session_state.ensemble_history)
+ total_predictions_overall = total_single_predictions + total_ensemble_predictions
- overall_spam_count = sum(st.session_state.model_stats[model]['spam'] for model in MODEL_OPTIONS) + \
- sum(1 for entry in st.session_state.ensemble_history if entry['prediction'] == 'SPAM')
- overall_ham_count = sum(st.session_state.model_stats[model]['ham'] for model in MODEL_OPTIONS) + \
- sum(1 for entry in st.session_state.ensemble_history if entry['prediction'] == 'HAM')
- col_spam, col_ham = st.columns(2)
- with col_spam:
st.markdown(f"""
-
-
Spam Count
-
{overall_spam_count}
+
+
Total Predictions
+
{total_predictions_overall}
""", unsafe_allow_html=True)
- with col_ham:
- st.markdown(f"""
-
-
Ham Count
-
{overall_ham_count}
+
+ overall_spam_count = sum(
+ st.session_state.model_stats.get(model, {}).get('spam', 0)
+ for model in MODEL_OPTIONS.keys()
+ ) + sum(1 for entry in st.session_state.ensemble_history if entry['prediction'] == 'SPAM')
+
+ overall_ham_count = sum(
+ st.session_state.model_stats.get(model, {}).get('ham', 0)
+ for model in MODEL_OPTIONS.keys()
+ ) + sum(1 for entry in st.session_state.ensemble_history if entry['prediction'] == 'HAM')
+
+ col_spam, col_ham = st.columns(2)
+ with col_spam:
+ st.markdown(f"""
+
+
Spam Count
+
{overall_spam_count}
+
+ """, unsafe_allow_html=True)
+ with col_ham:
+ st.markdown(f"""
+
+
Ham Count
+
{overall_ham_count}
+
+ """, unsafe_allow_html=True)
+ except (AttributeError, KeyError, TypeError):
+ # Handle case where session state is not properly initialized
+ st.markdown("""
+
""", unsafe_allow_html=True)
+ col_spam, col_ham = st.columns(2)
+ with col_spam:
+ st.markdown("""
+
+ """, unsafe_allow_html=True)
+ with col_ham:
+ st.markdown("""
+
+ """, unsafe_allow_html=True)
+
# --- Model Loading Helpers ---
@st.cache_resource
@@ -417,8 +525,8 @@ def _load_model_cached(model_id):
if tokenizer is None or model is None:
return None
pipe = pipeline(
- "text-classification",
- model=model,
+ "text-classification",
+ model=model,
tokenizer=tokenizer,
device=0 if torch.cuda.is_available() else -1
)
@@ -427,35 +535,23 @@ def _load_model_cached(model_id):
st.error(f"โ Error creating pipeline for {model_id}: {str(e)}")
return None
+
def load_model_if_needed(model_name, _progress_callback=None):
- if st.session_state.loaded_models[model_name] is None:
+ """Load a model if not already loaded, with optional progress callback."""
+ if model_name not in st.session_state.loaded_models or st.session_state.loaded_models[model_name] is None:
+ if _progress_callback:
+ _progress_callback(f"Loading {model_name} model...")
+
model_id = MODEL_OPTIONS[model_name]["id"]
- status_container = st.empty()
- def update_status(message):
- if status_container:
- status_container.info(message)
- if _progress_callback:
- _progress_callback(message)
- try:
- update_status(f"Starting to load {model_name}...")
- update_status(f"๐ Loading tokenizer for {model_name}...")
- update_status(f"๐ค Loading {model_name} model... (This may take a few minutes)")
- model = _load_model_cached(model_id)
- if model is not None:
- update_status(f"โ
Successfully loaded {model_name}")
- st.session_state.loaded_models[model_name] = model
- else:
- update_status(f"โ Failed to load {model_name}")
- return None
- time.sleep(1)
- except Exception as e:
- update_status(f"โ Error loading {model_name}: {str(e)}")
- return None
- finally:
- time.sleep(1)
- status_container.empty()
+ st.session_state.loaded_models[model_name] = _load_model_cached(model_id)
+
+ if _progress_callback:
+ _progress_callback(f"โ
{model_name} model loaded successfully!")
+
return st.session_state.loaded_models[model_name]
+
+
def get_loaded_models():
models = {}
progress_bar = st.progress(0)
@@ -946,14 +1042,21 @@ def render_ensemble_dashboard():
st.markdown("### ๐ง Ensemble Method Analytics")
# Analyze ensemble history
- method_stats = defaultdict(lambda: {'count': 0, 'spam': 0, 'confidences': []})
+ method_stats: Dict[str, MethodStats] = {}
+ for method in ENSEMBLE_METHODS.keys():
+ method_stats[method] = MethodStats(
+ count=0,
+ spam=0,
+ confidences=[]
+ )
for item in st.session_state.ensemble_history:
method = item['method']
- method_stats[method]['count'] += 1
- method_stats[method]['confidences'].append(item['confidence'])
- if item['prediction'] == 'SPAM':
- method_stats[method]['spam'] += 1
+ if method in method_stats:
+ method_stats[method]['count'] += 1
+ method_stats[method]['confidences'].append(item['confidence'])
+ if item['prediction'] == 'SPAM':
+ method_stats[method]['spam'] += 1
if method_stats:
col1, col2 = st.columns(2)
@@ -1255,7 +1358,7 @@ def render_realtime_monitor():
'total_messages': len(st.session_state.classification_history) + len(st.session_state.ensemble_history)
}
- json_data = st.json.dumps(dashboard_data, indent=2, default=str)
+ json_data = json.dumps(dashboard_data, indent=2, default=str)
st.download_button(
label="๐ฅ Download Dashboard Data (JSON)",
@@ -1363,13 +1466,40 @@ def predict_proba_batch(texts: List[str]) -> np.ndarray:
# Set initial value of text_area based on sample_selector or previous user input
user_sms_initial_value = selected_message if selected_message else st.session_state.get('user_sms_input_value', "")
- user_sms = st.text_area(
- "Enter SMS message to analyse",
- value=user_sms_initial_value,
- height=120,
- placeholder="Type or paste your SMS message here...",
- help="Enter the SMS message you want to classify as spam or ham (legitimate)"
- )
+ col_text, col_record = st.columns([4, 1])
+
+ with col_text:
+ user_sms = st.text_area(
+ "Enter SMS message to analyse",
+ value=user_sms_initial_value,
+ height=120,
+ placeholder="Type or paste your SMS message here...",
+ help="Enter the SMS message you want to classify as spam or ham (legitimate)",
+ key="sms_input"
+ )
+
+ with col_record:
+ if st.button("๐ค Record Voice", type="primary", key="record_btn"):
+ with st.spinner("๐ค Listening... Speak clearly"):
+ try:
+ success, result = st.session_state.speech_handler.listen_and_transcribe()
+ if success:
+ # Get existing text and append new text
+ current_text = user_sms # Use the widget value directly
+ if current_text and current_text.strip():
+ new_text = f"{current_text.strip()} {result}"
+ else:
+ new_text = result
+ # Update session state for persistence
+ st.session_state.user_sms_input_value = new_text
+ st.success("โ
Speech recognized!")
+ st.rerun()
+ else:
+ st.error(f"โ {result}")
+ except Exception as e:
+ st.error(f"Error: {str(e)}")
+ time.sleep(2)
+
# Store current text_area value in session state for persistence
st.session_state.user_sms_input_value = user_sms
@@ -1471,6 +1601,12 @@ def predict_proba_batch(texts: List[str]) -> np.ndarray:
result = classifier(cleaned_sms)[0]
label = result['label'].upper()
confidence = result['score']
+feature/voice-to-sms
+ # Safely update model statistics with proper error handling
+ if selected_model_name not in st.session_state.model_stats:
+ st.session_state.model_stats[selected_model_name] = {'spam': 0, 'ham': 0, 'total': 0}
+
+
# Store prediction results in session state for explanation
st.session_state.user_sms = user_sms
@@ -1502,6 +1638,7 @@ def predict_proba_batch(texts: List[str]) -> np.ndarray:
cleaned_sms, confidence
)
+master
st.session_state.model_stats[selected_model_name][label.lower()] += 1
st.session_state.model_stats[selected_model_name]['total'] += 1
st.session_state.classification_history.append({
@@ -1709,7 +1846,7 @@ def predict_proba_batch(texts: List[str]) -> np.ndarray:
st.markdown("#### ๐ Single Model Performance")
# Check if there's any data for any model
- if any(st.session_state.model_stats[model]['total'] > 0 for model in MODEL_OPTIONS):
+ if any(st.session_state.model_stats.get(model, {}).get('total', 0) > 0 for model in MODEL_OPTIONS.keys()):
# Pie Chart for Spam/Ham Distribution of the SELECTED model
current_model_stats = st.session_state.model_stats[selected_model_name]
if current_model_stats['total'] > 0:
@@ -2169,4 +2306,3 @@ def classify_csv(file, ensemble_mode, selected_models_for_bulk, selected_ensembl
""", unsafe_allow_html=True)
-
diff --git a/requirements.txt b/requirements.txt
index 64c6b43..aa00c21 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,6 +24,21 @@ python-dotenv>=1.0.0,<2.0.0
pytest>=7.4.0,<8.0.0
pytest-cov>=4.1.0,<5.0.0
-#UI
-streamlit
-fpdf
\ No newline at end of file
+# UI Components
+streamlit>=1.30.0
+fpdf2>=2.7.0
+
+# Speech Recognition
+SpeechRecognition>=3.10.0
+PyAudio>=0.2.13
+pocketsphinx>=5.0.0
+
+# Type Checking
+mypy>=1.5.0
+types-requests>=2.31.0
+types-setuptools>=68.0.0
+types-protobuf>=4.24.0
+typing-extensions>=4.7.0
+pandas-stubs>=2.0.0
+types-tqdm>=4.65.0
+types-PyYAML>=6.0.0
\ No newline at end of file
diff --git a/speech_handler.py b/speech_handler.py
new file mode 100644
index 0000000..19ff284
--- /dev/null
+++ b/speech_handler.py
@@ -0,0 +1,151 @@
+"""
+Speech Handler Module
+
+A friendly and robust speech recognition handler that converts speech to text.
+Handles microphone input, noise adjustment, and multiple recognition engines.
+"""
+
+import speech_recognition as sr
+from typing import Tuple, Any
+
+
+class SpeechHandler:
+ """
+ A user-friendly speech recognition handler.
+
+ This class provides an easy-to-use interface for converting speech to text
+ with automatic error handling and fallback mechanisms.
+ """
+
+ def __init__(self):
+ """Initialize the speech handler with optimized settings."""
+ self.recognizer: Any = sr.Recognizer()
+ self._setup_recognizer()
+
+ def _setup_recognizer(self) -> None:
+ """Configure the recognizer with optimal settings for better accuracy."""
+ # Adjust energy threshold for better speech detection
+ if hasattr(self.recognizer, "energy_threshold"):
+ self.recognizer.energy_threshold = 300
+
+ # Enable dynamic energy threshold for adaptive listening
+ if hasattr(self.recognizer, "dynamic_energy_threshold"):
+ self.recognizer.dynamic_energy_threshold = True
+
+ # Set pause threshold for natural speech breaks
+ if hasattr(self.recognizer, "pause_threshold"):
+ self.recognizer.pause_threshold = 0.8
+
+ def _validate_audio(self, audio) -> bool:
+ """Validate that the captured audio is suitable for processing."""
+ if not audio or not hasattr(audio, 'sample_rate'):
+ return False
+
+ # Check if audio is too short (less than 0.5 seconds)
+ if len(audio.frame_data) / audio.sample_rate < 0.5:
+ return False
+
+ return True
+
+ def listen_and_transcribe(self, timeout: int = 5, max_duration: int = 10) -> Tuple[bool, str]:
+ """
+ Listen for speech and convert it to text.
+
+ Args:
+ timeout: Maximum time to wait for speech (seconds)
+ max_duration: Maximum duration of speech to record (seconds)
+
+ Returns:
+ Tuple of (success: bool, result: str)
+ """
+ try:
+ # Create a new microphone instance for fresh capture
+ microphone = sr.Microphone()
+
+ with microphone as source:
+ print("๐ค Adjusting for ambient noise... Please wait.")
+ # Adjust for ambient noise to improve recognition
+ self.recognizer.adjust_for_ambient_noise(source, duration=1)
+
+ try:
+ print("๐ค Listening... Please speak clearly.")
+ audio = self.recognizer.listen(
+ source,
+ timeout=timeout,
+ phrase_time_limit=max_duration
+ )
+
+ # Validate the captured audio
+ if not self._validate_audio(audio):
+ return False, "โ Audio too short or invalid. Please try again."
+
+ except sr.WaitTimeoutError:
+ return False, "โฐ No speech detected. Please try speaking again."
+ except Exception as e:
+ return False, f"โ Error recording audio: {str(e)}"
+
+ # Convert speech to text with fallback mechanisms
+ return self._transcribe_audio(audio)
+
+ except Exception as e:
+ return False, f"๐ฅ Critical error: {str(e)}"
+
+ def _transcribe_audio(self, audio) -> Tuple[bool, str]:
+ """Transcribe the captured audio to text using multiple engines."""
+ try:
+ # Primary: Try Google's speech recognition (most accurate)
+ try:
+ print("๐ Processing with Google Speech Recognition...")
+ result = self.recognizer.recognize_google(audio)
+ if result and result.strip():
+ print(f"โ
Recognized: '{result}'")
+ return True, result
+ except (sr.UnknownValueError, sr.RequestError) as e:
+ print(f"โ ๏ธ Google recognition failed: {str(e)}")
+
+ # Fallback: Try local Sphinx recognizer if available
+ try:
+ print("๐ Trying offline recognition...")
+ result = self.recognizer.recognize_sphinx(audio)
+ if result and result.strip():
+ print(f"โ
Offline recognition: '{result}'")
+ return True, result
+ except ImportError:
+ print("โน๏ธ Offline recognizer not available")
+ except Exception as e:
+ print(f"โ ๏ธ Offline recognition failed: {str(e)}")
+
+ return False, "โ Could not understand the audio. Please speak more clearly."
+
+ except Exception as e:
+ return False, f"โ Speech recognition error: {str(e)}"
+
+ def cleanup(self) -> None:
+ """Clean up resources when done."""
+ try:
+ if hasattr(self, 'recognizer'):
+ del self.recognizer
+ print("๐งน Speech handler cleaned up successfully")
+ except Exception as e:
+ print(f"โ ๏ธ Cleanup warning: {str(e)}")
+
+
+# Example usage
+if __name__ == "__main__":
+ print("๐ค Speech Handler Demo")
+ print("=" * 50)
+
+ handler = SpeechHandler()
+
+ try:
+ success, text = handler.listen_and_transcribe(timeout=5, max_duration=10)
+
+ if success:
+ print(f"\n๐ Success! Transcribed text: '{text}'")
+ else:
+ print(f"\nโ Failed: {text}")
+
+ except KeyboardInterrupt:
+ print("\nโน๏ธ Stopped by user")
+ finally:
+ handler.cleanup()
diff --git a/speech_to_text.py b/speech_to_text.py
new file mode 100644
index 0000000..5b9399a
--- /dev/null
+++ b/speech_to_text.py
@@ -0,0 +1,313 @@
+"""
+๐ค Speech to Text Module
+
+A friendly and robust speech recognition system with real-time processing capabilities.
+Supports both continuous listening and one-time transcription with callback support.
+
+Features:
+- ๐ฏ Real-time continuous listening
+- ๐ One-time transcription
+- ๐ก๏ธ Comprehensive error handling
+- ๐จ User-friendly interface
+- โก Multi-threaded processing
+"""
+
+import speech_recognition as sr
+import threading
+import queue
+import time
+from typing import Optional, Callable, Dict, Any
+
+
+class SpeechToText:
+ """
+ ๐ค Friendly Speech-to-Text Converter
+
+ This class provides an easy-to-use interface for converting speech to text
+ with both real-time continuous listening and one-time transcription capabilities.
+
+ Features:
+ - Real-time speech recognition with callbacks
+ - Single utterance transcription
+ - Robust error handling with user-friendly messages
+ - Multi-threaded processing for smooth performance
+ - Automatic microphone calibration
+ """
+
+ def __init__(self):
+ """Initialize the speech-to-text system with optimal settings."""
+ print("๐ Initializing Speech-to-Text system...")
+
+ self.recognizer = sr.Recognizer()
+ self.microphone = sr.Microphone()
+ self.is_listening = False
+ self.audio_queue = queue.Queue()
+ self.error_callback: Optional[Callable] = None
+ self.text_callback: Optional[Callable] = None
+ self.processing_thread: Optional[threading.Thread] = None
+
+ # Configure optimal settings
+ self._setup_recognizer()
+ print("โ
Speech-to-Text system ready!")
+
+ def _setup_recognizer(self) -> None:
+ """Configure the recognizer with optimal settings for better accuracy."""
+ try:
+ print("๐ค Calibrating microphone for ambient noise...")
+ with self.microphone as source:
+ self.recognizer.adjust_for_ambient_noise(source, duration=1)
+ print("โ
Microphone calibrated successfully!")
+
+ # Set optimal recognition parameters
+ self.recognizer.energy_threshold = 300
+ self.recognizer.dynamic_energy_threshold = True
+ self.recognizer.pause_threshold = 0.8
+
+ except OSError as e:
+ print(f"โ ๏ธ Microphone access error: {str(e)}")
+ print("๐ก Please check your microphone permissions and try again.")
+ except Exception as e:
+ print(f"โ ๏ธ Microphone setup warning: {str(e)}")
+ print("๐ง Using default microphone settings.")
+
+ def set_callbacks(self, text_callback: Callable, error_callback: Callable) -> None:
+ """
+ Set callback functions for handling transcription results and errors.
+
+ Args:
+ text_callback: Function to call when text is recognized
+ error_callback: Function to call when an error occurs
+ """
+ self.text_callback = text_callback
+ self.error_callback = error_callback
+ print("๐ Callbacks configured successfully!")
+
+ def start_listening(self) -> None:
+ """Start continuous listening in a background thread."""
+ if not self.is_listening:
+ self.is_listening = True
+ print("๐ค Starting continuous listening...")
+ threading.Thread(target=self._listen_loop, daemon=True).start()
+ else:
+ print("โน๏ธ Already listening!")
+
+ def stop_listening(self) -> None:
+ """Stop the continuous listening loop."""
+ if self.is_listening:
+ self.is_listening = False
+ print("โน๏ธ Stopped listening.")
+ else:
+ print("โน๏ธ Not currently listening.")
+
+ def _listen_loop(self) -> None:
+ """Main listening loop that runs in a separate thread."""
+ while self.is_listening:
+ try:
+ with self.microphone as source:
+ audio = self.recognizer.listen(
+ source,
+ timeout=5,
+ phrase_time_limit=10
+ )
+ self.audio_queue.put(audio)
+
+ # Process audio in a separate thread to avoid blocking
+ threading.Thread(
+ target=self._process_audio,
+ args=(audio,),
+ daemon=True
+ ).start()
+
+ except sr.WaitTimeoutError:
+ continue # No speech detected, continue listening
+ except Exception as e:
+ if self.error_callback:
+ self.error_callback(f"โ Error capturing audio: {str(e)}")
+ time.sleep(1) # Prevent tight loop on error
+
+ def _process_audio(self, audio) -> None:
+ """Process captured audio and convert to text with fallback options."""
+ try:
+ print("๐ Processing audio...")
+ # Try Google Speech Recognition first (most accurate)
+ text = self.recognizer.recognize_google(audio) # type: ignore
+
+ if text and text.strip():
+ print(f"โ
Recognized: '{text}'")
+ if self.text_callback:
+ self.text_callback(text)
+ else:
+ print("โ ๏ธ Empty recognition result")
+
+ except sr.UnknownValueError:
+ error_msg = "โ Could not understand the audio. Please speak more clearly."
+ print(error_msg)
+ if self.error_callback:
+ self.error_callback(error_msg)
+
+ except sr.RequestError as e:
+ error_msg = f"โ Could not connect to Google Speech Recognition: {str(e)}"
+ print(error_msg)
+ print("๐ก Trying offline recognition...")
+
+ # Fallback to offline recognition if available
+ try:
+ text = self.recognizer.recognize_sphinx(audio) # type: ignore
+ if text and text.strip():
+ print(f"โ
Offline recognition: '{text}'")
+ if self.text_callback:
+ self.text_callback(text)
+ else:
+ error_msg = "โ Offline recognition also failed"
+ print(error_msg)
+ if self.error_callback:
+ self.error_callback(error_msg)
+ except ImportError:
+ error_msg = "โ Offline recognizer not available. Please install PocketSphinx."
+ print(error_msg)
+ if self.error_callback:
+ self.error_callback(error_msg)
+ except Exception as offline_error:
+ error_msg = f"โ Offline recognition failed: {str(offline_error)}"
+ print(error_msg)
+ if self.error_callback:
+ self.error_callback(error_msg)
+
+ except Exception as e:
+ error_msg = f"๐ฅ Unexpected error processing audio: {str(e)}"
+ print(error_msg)
+ if self.error_callback:
+ self.error_callback(error_msg)
+
+ def transcribe_once(self, timeout: int = 5, max_duration: int = 10) -> Optional[str]:
+ """
+ Capture and transcribe a single utterance with user-friendly feedback.
+
+ Args:
+ timeout: Maximum time to wait for speech (seconds)
+ max_duration: Maximum duration of speech to record (seconds)
+
+ Returns:
+ Transcribed text or None if failed
+ """
+ try:
+ print("๐ค Listening for single utterance... Speak now!")
+ print("(Press Ctrl+C to cancel)")
+
+ with self.microphone as source:
+ audio = self.recognizer.listen(
+ source,
+ timeout=timeout,
+ phrase_time_limit=max_duration
+ )
+
+ print("๐ Processing your speech...")
+ # Try Google Speech Recognition first
+ text = self.recognizer.recognize_google(audio) # type: ignore
+
+ if text and text.strip():
+ print(f"๐ Success! Transcribed: '{text}'")
+ return text
+ else:
+ print("โ ๏ธ No speech content detected")
+ return None
+
+ except sr.UnknownValueError:
+ print("โ Could not understand the audio. Please speak more clearly.")
+ except sr.RequestError as e:
+ print(f"โ Could not connect to Google Speech Recognition: {str(e)}")
+ print("๐ก Trying offline recognition...")
+
+ # Fallback to offline recognition
+ try:
+ text = self.recognizer.recognize_sphinx(audio) # type: ignore
+ if text and text.strip():
+ print(f"๐ Offline recognition successful: '{text}'")
+ return text
+ else:
+ print("โ Offline recognition also failed")
+ except ImportError:
+ print("โ Offline recognizer not available. Please install PocketSphinx for offline support.")
+ except Exception as offline_error:
+ print(f"โ Offline recognition failed: {str(offline_error)}")
+ except sr.WaitTimeoutError:
+ print("โฐ No speech detected within timeout period. Please try again.")
+ except KeyboardInterrupt:
+ print("\nโน๏ธ Cancelled by user")
+ except Exception as e:
+ print(f"๐ฅ Unexpected error: {str(e)}")
+
+ return None
+
+ def get_status(self) -> dict:
+ """Get the current status of the speech recognition system."""
+ return {
+ "is_listening": self.is_listening,
+ "queue_size": self.audio_queue.qsize(),
+ "has_text_callback": self.text_callback is not None,
+ "has_error_callback": self.error_callback is not None
+ }
+
+
+# Example usage and demo
+def demo_text_callback(text: str) -> None:
+ """Demo callback for successful text recognition."""
+ print(f"๐ Text callback: {text}")
+
+
+def demo_error_callback(error: str) -> None:
+ """Demo callback for errors."""
+ print(f"๐จ Error callback: {error}")
+
+
+if __name__ == "__main__":
+ print("๐ค Speech to Text Demo")
+ print("=" * 50)
+
+ # Create instance
+ stt = SpeechToText()
+
+ print("\nChoose an option:")
+ print("1. Single transcription")
+ print("2. Continuous listening demo")
+ print("3. Status check")
+
+ try:
+ choice = input("\nEnter your choice (1-3): ").strip()
+
+ if choice == "1":
+ print("\n--- Single Transcription Mode ---")
+ result = stt.transcribe_once()
+ if result:
+ print(f"\nFinal result: {result}")
+ else:
+ print("\nNo result obtained.")
+
+ elif choice == "2":
+ print("\n--- Continuous Listening Mode ---")
+ stt.set_callbacks(demo_text_callback, demo_error_callback)
+ stt.start_listening()
+
+ print("Listening... Say something! (Press Ctrl+C to stop)")
+ try:
+ while True:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ stt.stop_listening()
+ print("\nStopped listening.")
+
+ elif choice == "3":
+ print("\n--- Status Check ---")
+ status = stt.get_status()
+ for key, value in status.items():
+ print(f"{key}: {value}")
+
+ else:
+ print("โ Invalid choice. Please run again.")
+
+ except KeyboardInterrupt:
+ print("\nโน๏ธ Demo interrupted by user")
+ except Exception as e:
+ print(f"๐ฅ Demo error: {str(e)}")
+
+ print("\n๐ Demo completed!")
diff --git a/types.py b/types.py
new file mode 100644
index 0000000..4ab188b
--- /dev/null
+++ b/types.py
@@ -0,0 +1,53 @@
+"""Type definitions for the Spamlyser application."""
+from typing import TypeVar, Dict, Any, List, Union, Optional
+from typing_extensions import TypedDict, Protocol
+
+# Type for model predictions
+class PredictionDict(TypedDict):
+ label: str
+ score: float
+ spam_probability: Optional[float]
+
+# Type for model statistics
+class ModelStats(TypedDict):
+ spam: int
+ ham: int
+ total: int
+
+# Type for method statistics
+class MethodStats(TypedDict):
+ count: int
+ spam: int
+ confidences: List[float]
+
+# Type for ensemble predictions
+class EnsemblePrediction(TypedDict):
+ label: str
+ confidence: float
+ method: str
+ spam_probability: float
+ details: str
+
+# Type for model options
+class ModelOption(TypedDict):
+ id: str
+ description: str
+ icon: str
+ color: str
+
+# Protocol for models
+class Model(Protocol):
+ def __call__(self, text: Union[str, List[str]]) -> List[PredictionDict]: ...
+
+# Type variables
+T = TypeVar('T')
+ModelName = str
+ModelDict = Dict[ModelName, Model]
+OptionsDict = Dict[ModelName, ModelOption]
+StatsDict = Dict[ModelName, ModelStats]
+MethodStatsDict = Dict[str, MethodStats]
+
+# Type aliases
+JsonDict = Dict[str, Any]
+Confidence = float
+Label = str