diff --git a/src-tauri/src/actions.rs b/src-tauri/src/actions.rs index ca14c257e..0fba93200 100644 --- a/src-tauri/src/actions.rs +++ b/src-tauri/src/actions.rs @@ -3,6 +3,7 @@ use crate::apple_intelligence; use crate::audio_feedback::{play_feedback_sound, play_feedback_sound_blocking, SoundType}; use crate::managers::audio::AudioRecordingManager; use crate::managers::history::HistoryManager; +use crate::managers::operation::OperationCoordinator; use crate::managers::transcription::TranscriptionManager; use crate::settings::{get_settings, AppSettings, APPLE_INTELLIGENCE_PROVIDER_ID}; use crate::shortcut; @@ -13,7 +14,7 @@ use async_openai::types::{ CreateChatCompletionRequestArgs, }; use ferrous_opencc::{config::BuiltinConfig, OpenCC}; -use log::{debug, error}; +use log::{debug, error, info, warn}; use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::Arc; @@ -248,6 +249,16 @@ impl ShortcutAction for TranscribeAction { let start_time = Instant::now(); debug!("TranscribeAction::start called for binding: {}", binding_id); + // Start a new operation via the coordinator. This will automatically + // mark any in-progress operation as stale, preventing race conditions + // when rapidly toggling push-to-talk. + let coordinator = app.state::>(); + let operation_id = coordinator.start_recording(binding_id); + debug!( + "Started operation {} for binding: {}", + operation_id, binding_id + ); + // Load model in the background let tm = app.state::>(); tm.initiate_model_load(); @@ -299,6 +310,8 @@ impl ShortcutAction for TranscribeAction { }); } else { debug!("Failed to start recording"); + // If recording failed to start, complete the operation + coordinator.complete(operation_id); } } @@ -320,10 +333,34 @@ impl ShortcutAction for TranscribeAction { let stop_time = Instant::now(); debug!("TranscribeAction::stop called for binding: {}", binding_id); + // Get the current operation ID from the coordinator + let coordinator = app.state::>(); + let operation_id = match coordinator.active_operation_id() { + Some(id) => id, + None => { + warn!("TranscribeAction::stop called but no active operation found"); + utils::hide_recording_overlay(app); + change_tray_icon(app, TrayIconState::Idle); + return; + } + }; + + // Transition to processing phase + if !coordinator.transition_to_processing(operation_id) { + info!( + "Operation {} was superseded, aborting stop", + operation_id + ); + utils::hide_recording_overlay(app); + change_tray_icon(app, TrayIconState::Idle); + return; + } + let ah = app.clone(); let rm = Arc::clone(&app.state::>()); let tm = Arc::clone(&app.state::>()); let hm = Arc::clone(&app.state::>()); + let coordinator = Arc::clone(&coordinator); change_tray_icon(app, TrayIconState::Transcribing); show_transcribing_overlay(app); @@ -339,10 +376,27 @@ impl ShortcutAction for TranscribeAction { tauri::async_runtime::spawn(async move { let binding_id = binding_id.clone(); // Clone for the inner async task debug!( - "Starting async transcription task for binding: {}", - binding_id + "Starting async transcription task for binding: {} (operation {})", + binding_id, operation_id ); + // Helper to clean up UI and complete operation + let cleanup = |ah: &AppHandle, coordinator: &OperationCoordinator, op_id: u64| { + utils::hide_recording_overlay(ah); + change_tray_icon(ah, TrayIconState::Idle); + coordinator.complete(op_id); + }; + + // Check if operation is still active before proceeding + if !coordinator.is_active(operation_id) { + info!( + "Operation {} is no longer active, aborting transcription", + operation_id + ); + cleanup(&ah, &coordinator, operation_id); + return; + } + let stop_recording_time = Instant::now(); if let Some(samples) = rm.stop_recording(&binding_id) { debug!( @@ -351,6 +405,16 @@ impl ShortcutAction for TranscribeAction { samples.len() ); + // Check again before transcription (this is the expensive part) + if !coordinator.is_active(operation_id) { + info!( + "Operation {} superseded before transcription, aborting", + operation_id + ); + cleanup(&ah, &coordinator, operation_id); + return; + } + let transcription_time = Instant::now(); let samples_clone = samples.clone(); // Clone for history saving match tm.transcribe(samples) { @@ -360,6 +424,17 @@ impl ShortcutAction for TranscribeAction { transcription_time.elapsed(), transcription ); + + // Check again after transcription before pasting + if !coordinator.is_active(operation_id) { + info!( + "Operation {} superseded after transcription, not pasting", + operation_id + ); + cleanup(&ah, &coordinator, operation_id); + return; + } + if !transcription.is_empty() { let settings = get_settings(&ah); let mut final_text = transcription.clone(); @@ -392,6 +467,16 @@ impl ShortcutAction for TranscribeAction { } } + // Final check before pasting (after potentially slow post-processing) + if !coordinator.is_active(operation_id) { + info!( + "Operation {} superseded after post-processing, not pasting", + operation_id + ); + cleanup(&ah, &coordinator, operation_id); + return; + } + // Save to history with post-processed text and prompt let hm_clone = Arc::clone(&hm); let transcription_for_history = transcription.clone(); @@ -411,6 +496,7 @@ impl ShortcutAction for TranscribeAction { // Paste the final text (either processed or original) let ah_clone = ah.clone(); + let coordinator_clone = Arc::clone(&coordinator); let paste_time = Instant::now(); ah.run_on_main_thread(move || { match utils::paste(final_text, ah_clone.clone()) { @@ -423,27 +509,24 @@ impl ShortcutAction for TranscribeAction { // Hide the overlay after transcription is complete utils::hide_recording_overlay(&ah_clone); change_tray_icon(&ah_clone, TrayIconState::Idle); + coordinator_clone.complete(operation_id); }) .unwrap_or_else(|e| { error!("Failed to run paste on main thread: {:?}", e); - utils::hide_recording_overlay(&ah); - change_tray_icon(&ah, TrayIconState::Idle); + cleanup(&ah, &coordinator, operation_id); }); } else { - utils::hide_recording_overlay(&ah); - change_tray_icon(&ah, TrayIconState::Idle); + cleanup(&ah, &coordinator, operation_id); } } Err(err) => { debug!("Global Shortcut Transcription error: {}", err); - utils::hide_recording_overlay(&ah); - change_tray_icon(&ah, TrayIconState::Idle); + cleanup(&ah, &coordinator, operation_id); } } } else { debug!("No samples retrieved from recording stop"); - utils::hide_recording_overlay(&ah); - change_tray_icon(&ah, TrayIconState::Idle); + cleanup(&ah, &coordinator, operation_id); } }); diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index e39c792b6..12e3ce89f 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -22,6 +22,7 @@ use env_filter::Builder as EnvFilterBuilder; use managers::audio::AudioRecordingManager; use managers::history::HistoryManager; use managers::model::ModelManager; +use managers::operation::OperationCoordinator; use managers::transcription::TranscriptionManager; #[cfg(unix)] use signal_hook::consts::SIGUSR2; @@ -125,12 +126,14 @@ fn initialize_core_logic(app_handle: &AppHandle) { ); let history_manager = Arc::new(HistoryManager::new(app_handle).expect("Failed to initialize history manager")); + let operation_coordinator = Arc::new(OperationCoordinator::new(app_handle)); // Add managers to Tauri's managed state app_handle.manage(recording_manager.clone()); app_handle.manage(model_manager.clone()); app_handle.manage(transcription_manager.clone()); app_handle.manage(history_manager.clone()); + app_handle.manage(operation_coordinator.clone()); // Initialize the shortcuts shortcut::init_shortcuts(app_handle); diff --git a/src-tauri/src/managers/mod.rs b/src-tauri/src/managers/mod.rs index 1239dc26b..a02414da8 100644 --- a/src-tauri/src/managers/mod.rs +++ b/src-tauri/src/managers/mod.rs @@ -1,4 +1,5 @@ pub mod audio; pub mod history; pub mod model; +pub mod operation; pub mod transcription; diff --git a/src-tauri/src/managers/operation.rs b/src-tauri/src/managers/operation.rs new file mode 100644 index 000000000..5120059c4 --- /dev/null +++ b/src-tauri/src/managers/operation.rs @@ -0,0 +1,227 @@ +//! Operation Coordinator +//! +//! Manages the lifecycle of transcription operations to prevent race conditions +//! when rapidly toggling push-to-talk. Ensures only one operation is active at a time +//! and provides clean cancellation of stale operations. + +use log::{debug, info, warn}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Mutex; +use std::time::Instant; +use tauri::AppHandle; + +/// Represents the current phase of an operation +#[derive(Debug, Clone)] +pub enum OperationPhase { + /// No operation in progress + Idle, + /// Currently recording audio + Recording { + operation_id: u64, + binding_id: String, + #[allow(dead_code)] // Useful for debugging/logging + started_at: Instant, + }, + /// Recording stopped, transcription in progress + Processing { + operation_id: u64, + #[allow(dead_code)] // Useful for debugging/logging + binding_id: String, + #[allow(dead_code)] // Useful for debugging/logging + started_at: Instant, + }, +} + +impl OperationPhase { + pub fn operation_id(&self) -> Option { + match self { + OperationPhase::Idle => None, + OperationPhase::Recording { operation_id, .. } => Some(*operation_id), + OperationPhase::Processing { operation_id, .. } => Some(*operation_id), + } + } + + #[allow(dead_code)] // Public API for future use + pub fn is_idle(&self) -> bool { + matches!(self, OperationPhase::Idle) + } +} + +/// Coordinates transcription operations to prevent race conditions. +/// +/// The coordinator ensures: +/// 1. Only one operation can be active at a time +/// 2. Starting a new operation while one is in progress marks the old one as stale +/// 3. Operations can check if they're still valid before proceeding +/// 4. Clean state transitions with proper logging +pub struct OperationCoordinator { + /// Monotonically increasing operation ID. Each new operation gets a unique ID. + next_operation_id: AtomicU64, + + /// The currently active operation ID. Operations compare their ID to this + /// to determine if they should continue or abort. + active_operation_id: AtomicU64, + + /// The current phase of the operation lifecycle + phase: Mutex, + + /// App handle for potential UI updates + #[allow(dead_code)] + app_handle: AppHandle, +} + +impl OperationCoordinator { + pub fn new(app_handle: &AppHandle) -> Self { + Self { + next_operation_id: AtomicU64::new(1), + active_operation_id: AtomicU64::new(0), + phase: Mutex::new(OperationPhase::Idle), + app_handle: app_handle.clone(), + } + } + + /// Start a new recording operation. + /// + /// If an operation is already in progress, it will be marked as stale. + /// Returns the operation ID that should be used to track this operation. + pub fn start_recording(&self, binding_id: &str) -> u64 { + let operation_id = self.next_operation_id.fetch_add(1, Ordering::SeqCst); + let previous_active = self.active_operation_id.swap(operation_id, Ordering::SeqCst); + + let mut phase = self.phase.lock().unwrap(); + let previous_phase = phase.clone(); + + *phase = OperationPhase::Recording { + operation_id, + binding_id: binding_id.to_string(), + started_at: Instant::now(), + }; + + if previous_active != 0 { + warn!( + "Starting new operation {} while operation {} was still active (phase: {:?})", + operation_id, previous_active, previous_phase + ); + } else { + debug!( + "Started recording operation {} for binding '{}'", + operation_id, binding_id + ); + } + + operation_id + } + + /// Transition from recording to processing phase. + /// + /// Returns true if the transition was successful (operation is still active). + /// Returns false if the operation has been superseded. + pub fn transition_to_processing(&self, operation_id: u64) -> bool { + if !self.is_active(operation_id) { + debug!( + "Operation {} is no longer active, skipping transition to processing", + operation_id + ); + return false; + } + + let mut phase = self.phase.lock().unwrap(); + + // Verify we're transitioning from the right state + match &*phase { + OperationPhase::Recording { + operation_id: phase_op_id, + binding_id, + .. + } if *phase_op_id == operation_id => { + *phase = OperationPhase::Processing { + operation_id, + binding_id: binding_id.clone(), + started_at: Instant::now(), + }; + debug!("Operation {} transitioned to processing", operation_id); + true + } + _ => { + warn!( + "Cannot transition operation {} to processing from phase {:?}", + operation_id, *phase + ); + false + } + } + } + + /// Mark an operation as complete and return to idle state. + /// + /// Only completes if the given operation_id matches the active operation. + pub fn complete(&self, operation_id: u64) { + let was_active = self + .active_operation_id + .compare_exchange(operation_id, 0, Ordering::SeqCst, Ordering::SeqCst) + .is_ok(); + + if was_active { + let mut phase = self.phase.lock().unwrap(); + if let Some(phase_op_id) = phase.operation_id() { + if phase_op_id == operation_id { + info!("Operation {} completed successfully", operation_id); + *phase = OperationPhase::Idle; + } + } + } else { + debug!( + "Operation {} was already superseded, not marking as complete", + operation_id + ); + } + } + + /// Cancel the current operation and return to idle state. + /// + /// This is used for explicit cancellation (e.g., user pressing cancel). + pub fn cancel(&self) { + let previous_active = self.active_operation_id.swap(0, Ordering::SeqCst); + + let mut phase = self.phase.lock().unwrap(); + let previous_phase = phase.clone(); + *phase = OperationPhase::Idle; + + if previous_active != 0 { + info!( + "Cancelled operation {} (was in phase {:?})", + previous_active, previous_phase + ); + } + } + + /// Check if the given operation is still the active operation. + /// + /// Operations should call this before performing significant work + /// to avoid wasting resources on stale operations. + pub fn is_active(&self, operation_id: u64) -> bool { + self.active_operation_id.load(Ordering::SeqCst) == operation_id + } + + /// Get the current phase of the operation lifecycle. + #[allow(dead_code)] // Public API for debugging/monitoring + pub fn current_phase(&self) -> OperationPhase { + self.phase.lock().unwrap().clone() + } + + /// Check if any operation is currently active. + #[allow(dead_code)] // Public API for future use + pub fn has_active_operation(&self) -> bool { + self.active_operation_id.load(Ordering::SeqCst) != 0 + } + + /// Get the active operation ID, if any. + pub fn active_operation_id(&self) -> Option { + let id = self.active_operation_id.load(Ordering::SeqCst); + if id == 0 { + None + } else { + Some(id) + } + } +} diff --git a/src-tauri/src/managers/transcription.rs b/src-tauri/src/managers/transcription.rs index 2418db9db..d6669268b 100644 --- a/src-tauri/src/managers/transcription.rs +++ b/src-tauri/src/managers/transcription.rs @@ -316,17 +316,40 @@ impl TranscriptionManager { debug!("Audio vector length: {}", audio.len()); - if audio.len() == 0 { + if audio.is_empty() { debug!("Empty audio vector"); return Ok(String::new()); } // Check if model is loaded, if not try to load it { - // If the model is loading, wait for it to complete. + // If the model is loading, wait for it to complete with a timeout. + // This prevents indefinite hangs if model loading fails or gets stuck. let mut is_loading = self.is_loading.lock().unwrap(); + let timeout = Duration::from_secs(60); // 60 second timeout for model loading + let deadline = std::time::Instant::now() + timeout; + while *is_loading { - is_loading = self.loading_condvar.wait(is_loading).unwrap(); + let remaining = deadline.saturating_duration_since(std::time::Instant::now()); + if remaining.is_zero() { + warn!("Timed out waiting for model to load after {:?}", timeout); + return Err(anyhow::anyhow!( + "Timed out waiting for model to load. Please try again." + )); + } + + let (guard, wait_result) = self + .loading_condvar + .wait_timeout(is_loading, remaining) + .unwrap(); + is_loading = guard; + + if wait_result.timed_out() && *is_loading { + warn!("Timed out waiting for model to load after {:?}", timeout); + return Err(anyhow::anyhow!( + "Timed out waiting for model to load. Please try again." + )); + } } let engine_guard = self.engine.lock().unwrap(); diff --git a/src-tauri/src/utils.rs b/src-tauri/src/utils.rs index 636ef8ef2..ddd85c94a 100644 --- a/src-tauri/src/utils.rs +++ b/src-tauri/src/utils.rs @@ -1,4 +1,5 @@ use crate::managers::audio::AudioRecordingManager; +use crate::managers::operation::OperationCoordinator; use crate::shortcut; use crate::ManagedToggleState; use log::{info, warn}; @@ -19,7 +20,12 @@ pub fn cancel_current_operation(app: &AppHandle) { // Unregister the cancel shortcut asynchronously shortcut::unregister_cancel_shortcut(app); - // First, reset all shortcut toggle states. + // Cancel the current operation via the coordinator + // This marks any in-progress operation as stale, so async tasks will abort + let coordinator = app.state::>(); + coordinator.cancel(); + + // Reset all shortcut toggle states. // This is critical for non-push-to-talk mode where shortcuts toggle on/off let toggle_state_manager = app.state::(); if let Ok(mut states) = toggle_state_manager.lock() {