diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..e895a72 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,353 @@ +# Contributing to Meeting Summarizer Bot + +This document provides developer-focused guidance for working on the meeting summarizer bot. + +## Development Philosophy + +This project prioritizes **correctness and reliability** over performance. The bot runs on a low-resource VM and processes recordings in batches after meetings end, so: + +- Focus on stability and crash recovery +- Prefer simple, debuggable code over clever optimizations +- Log extensively for troubleshooting +- Make failures visible and explicit + +## Code Organization + +### Module Responsibilities + +| Module | Purpose | State? | +|--------|---------|--------| +| `main.py` | Discord bot, commands, reconnection | Stateless | +| `meeting_state.py` | Active meeting tracking | In-memory | +| `audio_recorder.py` | Voice recording | Stateless | +| `chunk_state_manager.py` | Chunk state, retry, persistence | **Persisted** | +| `recovery_manager.py` | Crash recovery logic | Stateless | + +## Chunk State System + +### Core Concepts + +**Chunk:** A recorded audio segment that must be transcribed + +**State:** Current processing stage (RECORDED, SENT, TRANSCRIBED) + +**Persistence:** All chunk state survives crashes + +**Retry:** Failed chunks are retried independently up to max attempts + +### State Transitions + +``` +RECORDED ──> SENT ──> TRANSCRIBED + │ + └──> RECORDED (on failure, for retry) +``` + +Invalid transitions are rejected and logged. + +### Integration Points + +**When recording completes a chunk:** +```python +from chunk_state_manager import ChunkStateManager, ChunkState + +manager = ChunkStateManager() +chunk = manager.register_chunk( + chunk_id="unique-id", + meeting_id="meeting-id", + file_path="/path/to/chunk.wav" +) +# Chunk is now in RECORDED state +``` + +**When sending chunk for transcription:** +```python +# Mark as sent +manager.transition_chunk(chunk_id, ChunkState.SENT) + +# Call transcription API... +try: + result = await transcribe_api(chunk.file_path) + manager.set_transcription_result(chunk_id, result) + manager.transition_chunk(chunk_id, ChunkState.TRANSCRIBED) +except Exception as e: + # Mark failed + should_retry = manager.mark_chunk_failed(chunk_id, str(e)) + if should_retry: + # Schedule retry with backoff + delay = manager.get_retry_delay(chunk.retry_count) + await asyncio.sleep(delay) + # Process again... +``` + +**On bot startup:** +```python +from recovery_manager import RecoveryManager + +manager = ChunkStateManager() +recovery = RecoveryManager(manager) + +# Load state and get recovery summary +summary = recovery.recover_on_startup() + +# Get pending work +pending = recovery.get_pending_work() +for chunk in pending: + # Process chunk... +``` + +## Testing Your Changes + +### Unit Testing + +For isolated component tests, create files in `examples/`: + +```bash +python examples/test_your_component.py +``` + +### Integration Testing + +Test with the live bot: + +1. Start bot: `cd bot && python main.py` +2. Join a voice channel in Discord +3. Run `/start-meeting` +4. Test your feature +5. Check logs: `tail -f bot.log` +6. Inspect state: `ls -la state/` + +### Crash Testing + +Simulate crashes during different stages: + +```bash +# Start meeting +/start-meeting + +# Kill bot +kill + +# Restart bot +python main.py + +# Check recovery logs +grep "Recovery" bot.log +``` + +## Debugging Tips + +### Enable Verbose Logging + +Edit module logger level: + +```python +logging.getLogger('chunk_state_manager').setLevel(logging.DEBUG) +``` + +### Inspect Chunk State + +```python +from chunk_state_manager import ChunkStateManager + +manager = ChunkStateManager() +manager.load_state() + +# Get specific chunk +chunk = manager.get_chunk("chunk-id") +print(chunk.to_dict()) + +# Get all chunks for a meeting +chunks = manager.get_chunks_by_meeting("meeting-id") +for chunk in chunks: + print(f"{chunk.chunk_id}: {chunk.state.value}") + +# Check completion +is_complete, status = manager.is_meeting_complete("meeting-id") +print(f"Complete: {is_complete}, Status: {status}") +``` + +### Useful Log Filters + +```bash +# Failed chunks +grep "failed" bot.log | grep chunk + +# State transitions +grep "transitioned" bot.log + +# Retries +grep "will be retried" bot.log + +# Recovery +grep -A 20 "RECOVERY REPORT" bot.log +``` + +### Manual State Inspection + +State files are human-readable JSON: + +```bash +# View a chunk +cat state/.json | python -m json.tool + +# Count states +jq -r '.state' state/*.json | sort | uniq -c +``` + +## Code Style + +### General Guidelines + +- **No emojis** in code, logs, or user messages (use clear text) +- Use explicit variable names (`chunk_state` not `cs`) +- Log state changes at INFO level +- Log errors with context (chunk ID, meeting ID) +- Validate inputs and fail fast with clear messages + +### Error Handling + +```python +# Good: Clear, actionable error +if chunk_id not in self._chunks: + logger.error(f"Cannot transition unknown chunk: {chunk_id}") + return False + +# Bad: Silent failure +if chunk_id not in self._chunks: + return False +``` + +### Logging + +```python +# Good: Structured, searchable +logger.info( + f"Chunk {chunk_id} transitioned: {old_state} -> {new_state}" +) + +# Bad: Unstructured +logger.info("Changed state") +``` + +## Common Pitfalls + +### 1. Forgetting to Persist State + +**Wrong:** +```python +chunk.state = ChunkState.TRANSCRIBED # Not persisted! +``` + +**Right:** +```python +manager.transition_chunk(chunk_id, ChunkState.TRANSCRIBED) +``` + +### 2. Not Checking Retry Count + +**Wrong:** +```python +# Infinite retries! +manager.mark_chunk_failed(chunk_id, error) +# Process again... +``` + +**Right:** +```python +should_retry = manager.mark_chunk_failed(chunk_id, error) +if should_retry: + # Process again +else: + # Log permanent failure, alert, etc. +``` + +### 3. Assuming State Loaded + +**Wrong:** +```python +manager = ChunkStateManager() +chunk = manager.get_chunk(chunk_id) # May be None! +``` + +**Right:** +```python +manager = ChunkStateManager() +manager.load_state() # Load persisted state first +chunk = manager.get_chunk(chunk_id) +if chunk is None: + logger.error(f"Chunk {chunk_id} not found") + return +``` + +### 4. Not Validating Transitions + +**Wrong:** +```python +chunk.state = new_state # Bypasses validation +``` + +**Right:** +```python +success = chunk.transition_to(new_state) +if not success: + logger.error("Invalid transition") + return +``` + +## Performance Considerations + +This bot is **not** performance-critical. Optimize for correctness first. + +**Acceptable:** +- One chunk at a time processing +- JSON file per chunk +- Synchronous I/O for state persistence + +**Not Worth It:** +- Parallel chunk processing +- In-memory caching layers +- Async everything + +**Watch Out For:** +- Accumulating state files (clear completed meetings) +- Large transcription results (store references, not full text) +- Unbounded retry loops + +## Extending the System + +### Adding New Chunk States + +1. Add state to `ChunkState` enum +2. Update valid transitions in `ChunkMetadata.transition_to()` +3. Update `get_summary()` to count new state +4. Update documentation + +### Adding New Retry Strategies + +Edit `ChunkStateManager.get_retry_delay()`: + +```python +def get_retry_delay(self, retry_count: int) -> float: + # Linear backoff + return self.retry_base_delay * retry_count + + # Exponential with cap + return min( + self.retry_base_delay * (2 ** retry_count), + 60 # Max 60 seconds + ) +``` + +### Adding Metrics/Monitoring + +```python +def get_metrics(self, meeting_id: str) -> dict: + chunks = self.get_chunks_by_meeting(meeting_id) + return { + "total_chunks": len(chunks), + "avg_retry_count": sum(c.retry_count for c in chunks) / len(chunks), + "max_retry_count": max(c.retry_count for c in chunks), + "failed_chunks": len(self.get_failed_chunks(meeting_id)) + } +``` \ No newline at end of file diff --git a/README.md b/README.md index e69de29..5ffe634 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,396 @@ +# Meeting Summarizer Bot + +An internal Discord bot for recording and summarizing voice meetings. Built for RUXAILAB to help contributors catch up on missed meetings. + +## Overview + +The bot records Discord voice channel meetings in fixed-size chunks, transcribes them using a cloud STT API (Groq), and posts structured summaries to text channels. + +**Key Features:** +- Chunk-based audio recording with state tracking +- Automatic retry on transcription failures +- Crash recovery (survives process/VM restarts) +- Post-meeting batch processing +- Near-zero cost operation (free tier APIs) + +**Deployment Target:** +- Google Cloud e2-micro VM (always-on) +- Focus on stability and reliability over performance + +## Project Structure + +``` +meeting-summarizer/ +├── bot/ +│ ├── main.py # Discord bot entry point +│ ├── meeting_state.py # Meeting session management +│ ├── audio_recorder.py # Audio recording logic +│ ├── chunk_state_manager.py # Chunk state tracking & retry +│ └── recovery_manager.py # Crash recovery system +├── state/ # Chunk state persistence (JSON files) +├── recordings/ # Audio recordings by meeting +├── requirements.txt +└── README.md +``` + +## Setup Instructions + +### Prerequisites + +- Python 3.8+ +- Discord bot token +- Voice-capable Discord server for testing + +### Installation + +1. **Clone the repository** + ```bash + git clone + cd meeting-summarizer + ``` + +2. **Create a virtual environment** + ```bash + python3 -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. **Install dependencies** + ```bash + pip install -r requirements.txt + ``` + +4. **Configure environment variables** + ```bash + cp .env.example .env + ``` + + Edit `.env` and add your Discord bot token: + ``` + DISCORD_BOT_TOKEN=your_bot_token_here + ``` + +5. **Create required directories** + ```bash + mkdir -p recordings state + ``` + +### Running the Bot + +```bash +cd bot +python main.py +``` + +The bot will: +- Load persisted chunk state (if any) +- Connect to Discord +- Register slash commands +- Begin crash recovery if needed + +## Bot Commands + +| Command | Description | +|---------|-------------| +| `/start-meeting` | Join your voice channel and start recording | +| `/end-meeting` | Stop recording and leave voice channel | + +## How It Works + +### Recording Flow + +1. User runs `/start-meeting` while in a voice channel +2. Bot joins voice channel and creates a meeting session +3. Audio is recorded per-user in the `recordings//` directory +4. Each recording chunk is tracked with state: `RECORDED` → `SENT` → `TRANSCRIBED` +5. User runs `/end-meeting` to stop recording + +### Chunk State System + +Each audio chunk has an explicit state: + +- **RECORDED** - Chunk fully written to disk +- **SENT** - Chunk submitted for transcription +- **TRANSCRIBED** - Transcription completed successfully + +**State transitions:** +- `RECORDED → SENT` (when submitting to API) +- `SENT → TRANSCRIBED` (on success) +- `SENT → RECORDED` (on failure, for retry) + +All state is persisted to `state/.json` files. + +### Retry Mechanism + +Failed chunks are automatically retried with: +- Independent retry per chunk +- Exponential backoff (2s, 4s, 8s, 16s, 32s) +- Maximum 5 retry attempts +- Clear logging of each attempt + +Chunks exceeding max retries are marked as permanently failed and require manual intervention. + +### Crash Recovery + +On bot startup: +1. All chunk states are loaded from `state/` directory +2. Incomplete meetings are identified +3. Pending chunks (in `RECORDED` state) are queued for processing +4. Failed chunks are retried if under max retry limit + +## Testing Guide + +### Testing Locally + +1. **Start the bot** + ```bash + cd bot + python main.py + ``` + +2. **Join a voice channel in Discord** + +3. **Start a test meeting** + ``` + /start-meeting + ``` + +4. **Speak or play audio** (bot records all participants) + +5. **End the meeting** + ``` + /end-meeting + ``` + +6. **Check recordings** + ```bash + ls -la recordings// + ``` + +### Testing Chunk State Tracking + +See [examples/test_chunk_state.py](examples/test_chunk_state.py) for a standalone test demonstrating: +- Chunk registration +- State transitions +- Retry handling +- Persistence and recovery + +Run the test: +```bash +python examples/test_chunk_state.py +``` + +### Testing Crash Recovery + +**Simulate a crash during recording:** + +1. Start a meeting with `/start-meeting` +2. Kill the bot process (Ctrl+C or `kill `) +3. Restart the bot +4. Check logs for recovery report + +**Expected behavior:** +- Bot loads persisted chunk states +- Identifies incomplete meetings +- Logs a recovery report with counts +- Pending chunks remain visible until processed + +**Verify recovery:** +```bash +# Check state files +ls -la state/ + +# Inspect a chunk state file +cat state/.json +``` + +**Simulate chunk processing failure:** + +This requires integration with the transcription API (out of scope for this sub-issue), but you can manually test state transitions: + +```python +from bot.chunk_state_manager import ChunkStateManager, ChunkState + +manager = ChunkStateManager() +manager.load_state() + +# Simulate failed chunk +chunk_id = "test-chunk-001" +manager.mark_chunk_failed(chunk_id, "Simulated API timeout") + +# Check retry count +chunk = manager.get_chunk(chunk_id) +print(f"Retry count: {chunk.retry_count}") +print(f"State: {chunk.state}") +``` + +### Testing Meeting Completion + +A meeting is only complete when **all chunks are TRANSCRIBED**. + +```python +from bot.chunk_state_manager import ChunkStateManager + +manager = ChunkStateManager() +manager.load_state() + +# Check meeting completion +is_complete, status = manager.is_meeting_complete("") +print(f"Complete: {is_complete}") +print(f"Status: {status}") + +# Get summary +summary = manager.get_summary("") +print(f"Summary: {summary}") +``` + +## Debugging + +### Where to Look When Something Fails + +| Issue | Where to Check | +|-------|----------------| +| Bot won't start | `bot.log` (check token, permissions) | +| Recording not working | `bot.log` (audio sink errors) | +| Chunks not tracked | `state/` directory (should have .json files) | +| Chunks stuck in SENT | Check retry count in state file | +| Chunks permanently failed | `bot.log` (search for "exceeded max retries") | +| Meeting not completing | Run `get_summary()` to see chunk states | + +### Log Locations + +- **Bot logs:** `bot.log` (in the directory where you run the bot) +- **Chunk state files:** `state/.json` +- **Audio recordings:** `recordings//` + +### Useful Log Searches + +```bash +# Find failed chunks +grep "exceeded max retries" bot.log + +# Find retry attempts +grep "will be retried" bot.log + +# Find state transitions +grep "transitioned:" bot.log + +# Find recovery operations +grep "Recovery" bot.log +``` + +### Inspecting State + +```bash +# List all chunk states +ls -la state/ + +# View a specific chunk +cat state/.json | python -m json.tool + +# Count chunks by meeting +for file in state/*.json; do + meeting_id=$(jq -r '.meeting_id' "$file") + echo "$meeting_id" +done | sort | uniq -c +``` + +### Common Issues + +**Issue:** Chunks not persisting after crash +- **Cause:** State directory doesn't exist or insufficient permissions +- **Fix:** Ensure `state/` directory exists with write permissions + +**Issue:** Chunks stuck in SENT state after restart +- **Cause:** Bot crashed while waiting for transcription +- **Fix:** Recovery manager will reset to RECORDED on startup (check logs) + +**Issue:** Meeting shows complete but summary is empty +- **Cause:** Chunks were cleared or state files deleted +- **Fix:** Check `recordings/` for audio files and regenerate state if needed + +## Configuration + +### Retry Settings + +Edit `bot/chunk_state_manager.py`: + +```python +class ChunkStateManager: + def __init__(self, state_dir: str = "state"): + # ... + self.max_retries = 5 # Maximum retry attempts + self.retry_base_delay = 2 # Base delay in seconds + self.use_exponential_backoff = True # Enable exponential backoff +``` + +### Reconnection Settings + +Edit `bot/main.py`: + +```python +class MeetingBot(commands.Bot): + def __init__(self): + # ... + self.max_reconnect_attempts = 3 + self.reconnect_base_delay = 2 +``` + +## Architecture Notes + +### Design Decisions + +1. **JSON persistence over database** + - Simpler deployment (no DB setup) + - File-based is sufficient for low volume + - Easy to inspect/debug manually + +2. **Chunk-based processing** + - Limits memory usage + - Enables incremental progress + - Makes failures recoverable + +3. **Explicit state transitions** + - No silent failures + - Clear audit trail + - Easy to debug + +4. **Independent retry per chunk** + - One failed chunk doesn't block others + - Idempotent retry logic + - Prevents cascading failures + +### State Persistence + +Each chunk has a JSON file in `state/`: + +```json +{ + "chunk_id": "abc123", + "meeting_id": "def456", + "file_path": "recordings/def456/chunk_001.wav", + "state": "recorded", + "created_at": "2026-01-25T10:00:00", + "updated_at": "2026-01-25T10:00:05", + "retry_count": 0, + "last_error": null, + "transcription_result": null +} +``` + +### Meeting Completion Guarantee + +A meeting is considered complete **only when**: +- All chunks exist in state +- All chunks are in `TRANSCRIBED` state +- No chunks have permanently failed (exceeded max retries) + +This is validated with `ChunkStateManager.is_meeting_complete()`. + +## Contributing + +This is internal tooling for RUXAILAB. See [CONTRIBUTING.md](CONTRIBUTING.md) for developer guidelines, integration patterns, and testing strategies. + +## License + +See [LICENSE](LICENSE) for details. diff --git a/bot/chunk_state_manager.py b/bot/chunk_state_manager.py new file mode 100644 index 0000000..d037be0 --- /dev/null +++ b/bot/chunk_state_manager.py @@ -0,0 +1,467 @@ +""" +Chunk-level state tracking and retry mechanism. + +This module provides: +- Per-chunk state tracking (RECORDED, SENT, TRANSCRIBED) +- Independent chunk retry handling +- Crash recovery through JSON persistence +- Meeting completion guarantee (all chunks must be TRANSCRIBED) +""" + +import json +import logging +import time +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + + +class ChunkState(Enum): + """States for audio chunk processing.""" + RECORDED = "recorded" # Chunk fully written to disk + SENT = "sent" # Chunk submitted for transcription + TRANSCRIBED = "transcribed" # Transcription completed successfully + + +class ChunkMetadata: + """Metadata for a single audio chunk.""" + + def __init__( + self, + chunk_id: str, + meeting_id: str, + file_path: str, + state: ChunkState = ChunkState.RECORDED + ): + self.chunk_id = chunk_id + self.meeting_id = meeting_id + self.file_path = file_path + self.state = state + self.created_at = datetime.utcnow().isoformat() + self.updated_at = datetime.utcnow().isoformat() + self.retry_count = 0 + self.last_error: Optional[str] = None + self.transcription_result: Optional[str] = None + + def transition_to(self, new_state: ChunkState) -> bool: + """ + Validate and perform state transition. + + Valid transitions: + - RECORDED -> SENT + - SENT -> TRANSCRIBED + - SENT -> RECORDED (retry) + + Returns: + bool: True if transition is valid and completed, False otherwise + """ + valid_transitions = { + ChunkState.RECORDED: [ChunkState.SENT], + ChunkState.SENT: [ChunkState.TRANSCRIBED, ChunkState.RECORDED], + ChunkState.TRANSCRIBED: [] # Terminal state + } + + if new_state not in valid_transitions[self.state]: + logger.error( + f"Invalid state transition for chunk {self.chunk_id}: " + f"{self.state.value} -> {new_state.value}" + ) + return False + + old_state = self.state + self.state = new_state + self.updated_at = datetime.utcnow().isoformat() + + logger.info( + f"Chunk {self.chunk_id} transitioned: {old_state.value} -> {new_state.value}" + ) + return True + + def mark_failed(self, error: str) -> None: + """Mark chunk as failed and increment retry count.""" + self.retry_count += 1 + self.last_error = error + self.updated_at = datetime.utcnow().isoformat() + logger.warning( + f"Chunk {self.chunk_id} failed (attempt {self.retry_count}): {error}" + ) + + def to_dict(self) -> dict: + """Serialize to dictionary for JSON persistence.""" + return { + "chunk_id": self.chunk_id, + "meeting_id": self.meeting_id, + "file_path": self.file_path, + "state": self.state.value, + "created_at": self.created_at, + "updated_at": self.updated_at, + "retry_count": self.retry_count, + "last_error": self.last_error, + "transcription_result": self.transcription_result + } + + @classmethod + def from_dict(cls, data: dict) -> "ChunkMetadata": + """Deserialize from dictionary.""" + chunk = cls( + chunk_id=data["chunk_id"], + meeting_id=data["meeting_id"], + file_path=data["file_path"], + state=ChunkState(data["state"]) + ) + chunk.created_at = data["created_at"] + chunk.updated_at = data["updated_at"] + chunk.retry_count = data["retry_count"] + chunk.last_error = data.get("last_error") + chunk.transcription_result = data.get("transcription_result") + return chunk + + def __repr__(self) -> str: + return ( + f"ChunkMetadata(id={self.chunk_id}, state={self.state.value}, " + f"retries={self.retry_count})" + ) + + +class ChunkStateManager: + """ + Manages chunk state tracking, persistence, and retry logic. + + Ensures: + - No chunk is lost or silently ignored + - Failed chunks are retried independently + - State survives process crashes + - Meeting is only complete when all chunks are TRANSCRIBED + """ + + def __init__(self, state_dir: str = "state"): + self.state_dir = Path(state_dir) + self.state_dir.mkdir(parents=True, exist_ok=True) + + # In-memory chunk tracking + self._chunks: Dict[str, ChunkMetadata] = {} + + # Retry configuration + self.max_retries = 5 + self.retry_base_delay = 2 # seconds + self.use_exponential_backoff = True + + logger.info(f"ChunkStateManager initialized with state_dir: {self.state_dir}") + + def register_chunk( + self, + chunk_id: str, + meeting_id: str, + file_path: str + ) -> ChunkMetadata: + """ + Register a new chunk in RECORDED state. + + Args: + chunk_id: Unique identifier for the chunk + meeting_id: Meeting this chunk belongs to + file_path: Path to the recorded audio file + + Returns: + ChunkMetadata: The created chunk metadata + """ + if chunk_id in self._chunks: + logger.warning(f"Chunk {chunk_id} already registered") + return self._chunks[chunk_id] + + chunk = ChunkMetadata( + chunk_id=chunk_id, + meeting_id=meeting_id, + file_path=file_path, + state=ChunkState.RECORDED + ) + + self._chunks[chunk_id] = chunk + self._persist_chunk(chunk) + + logger.info(f"Registered chunk {chunk_id} for meeting {meeting_id}") + return chunk + + def transition_chunk(self, chunk_id: str, new_state: ChunkState) -> bool: + """ + Transition a chunk to a new state. + + Args: + chunk_id: ID of the chunk to transition + new_state: Target state + + Returns: + bool: True if transition successful, False otherwise + """ + if chunk_id not in self._chunks: + logger.error(f"Cannot transition unknown chunk: {chunk_id}") + return False + + chunk = self._chunks[chunk_id] + success = chunk.transition_to(new_state) + + if success: + self._persist_chunk(chunk) + + return success + + def mark_chunk_failed(self, chunk_id: str, error: str) -> bool: + """ + Mark a chunk as failed and determine if retry is needed. + + Args: + chunk_id: ID of the failed chunk + error: Error message describing the failure + + Returns: + bool: True if chunk should be retried, False if max retries exceeded + """ + if chunk_id not in self._chunks: + logger.error(f"Cannot mark unknown chunk as failed: {chunk_id}") + return False + + chunk = self._chunks[chunk_id] + chunk.mark_failed(error) + + if chunk.retry_count >= self.max_retries: + logger.error( + f"Chunk {chunk_id} exceeded max retries ({self.max_retries}). " + f"Manual intervention required." + ) + self._persist_chunk(chunk) + return False + + # Reset to RECORDED state for retry + chunk.transition_to(ChunkState.RECORDED) + self._persist_chunk(chunk) + + logger.info(f"Chunk {chunk_id} will be retried (attempt {chunk.retry_count + 1})") + return True + + def set_transcription_result(self, chunk_id: str, result: str) -> None: + """ + Store the transcription result for a chunk. + + Args: + chunk_id: ID of the chunk + result: Transcription text + """ + if chunk_id not in self._chunks: + logger.error(f"Cannot set result for unknown chunk: {chunk_id}") + return + + chunk = self._chunks[chunk_id] + chunk.transcription_result = result + self._persist_chunk(chunk) + + logger.info(f"Transcription result stored for chunk {chunk_id}") + + def get_chunk(self, chunk_id: str) -> Optional[ChunkMetadata]: + """Get chunk metadata by ID.""" + return self._chunks.get(chunk_id) + + def get_chunks_by_meeting(self, meeting_id: str) -> List[ChunkMetadata]: + """Get all chunks for a specific meeting.""" + return [ + chunk for chunk in self._chunks.values() + if chunk.meeting_id == meeting_id + ] + + def get_chunks_by_state( + self, + state: ChunkState, + meeting_id: Optional[str] = None + ) -> List[ChunkMetadata]: + """ + Get all chunks in a specific state. + + Args: + state: State to filter by + meeting_id: Optional meeting ID to further filter + + Returns: + List of chunks matching the criteria + """ + chunks = [chunk for chunk in self._chunks.values() if chunk.state == state] + + if meeting_id: + chunks = [chunk for chunk in chunks if chunk.meeting_id == meeting_id] + + return chunks + + def get_failed_chunks( + self, + meeting_id: Optional[str] = None + ) -> List[ChunkMetadata]: + """ + Get all chunks that have failed and exceeded max retries. + + These chunks require manual intervention. + """ + chunks = [ + chunk for chunk in self._chunks.values() + if chunk.retry_count >= self.max_retries and chunk.state != ChunkState.TRANSCRIBED + ] + + if meeting_id: + chunks = [chunk for chunk in chunks if chunk.meeting_id == meeting_id] + + return chunks + + def is_meeting_complete(self, meeting_id: str) -> Tuple[bool, str]: + """ + Check if all chunks for a meeting are TRANSCRIBED. + + Args: + meeting_id: Meeting ID to check + + Returns: + Tuple of (is_complete, status_message) + """ + chunks = self.get_chunks_by_meeting(meeting_id) + + if not chunks: + return False, "No chunks found for this meeting" + + total = len(chunks) + transcribed = len([c for c in chunks if c.state == ChunkState.TRANSCRIBED]) + failed = len(self.get_failed_chunks(meeting_id)) + + if transcribed == total: + return True, f"All {total} chunks transcribed successfully" + + if failed > 0: + return False, f"{failed} chunk(s) failed permanently (max retries exceeded)" + + return False, f"{transcribed}/{total} chunks transcribed" + + def get_retry_delay(self, retry_count: int) -> float: + """ + Calculate retry delay with optional exponential backoff. + + Args: + retry_count: Number of retries already attempted + + Returns: + Delay in seconds + """ + if self.use_exponential_backoff: + return self.retry_base_delay * (2 ** retry_count) + return self.retry_base_delay + + def _persist_chunk(self, chunk: ChunkMetadata) -> None: + """Persist chunk state to disk.""" + try: + state_file = self._get_chunk_state_file(chunk.chunk_id) + state_file.parent.mkdir(parents=True, exist_ok=True) + + with open(state_file, 'w') as f: + json.dump(chunk.to_dict(), f, indent=2) + + except Exception as e: + logger.error(f"Failed to persist chunk {chunk.chunk_id}: {e}") + + def _get_chunk_state_file(self, chunk_id: str) -> Path: + """Get the path to a chunk's state file.""" + return self.state_dir / f"{chunk_id}.json" + + def load_state(self) -> int: + """ + Load all persisted chunk state from disk. + + Called on startup to recover from crashes. + + Returns: + Number of chunks loaded + """ + loaded_count = 0 + + try: + for state_file in self.state_dir.glob("*.json"): + try: + with open(state_file, 'r') as f: + data = json.load(f) + chunk = ChunkMetadata.from_dict(data) + self._chunks[chunk.chunk_id] = chunk + loaded_count += 1 + + except Exception as e: + logger.error(f"Failed to load state file {state_file}: {e}") + + if loaded_count > 0: + logger.info(f"Loaded {loaded_count} chunk(s) from disk") + + except Exception as e: + logger.error(f"Failed to load chunk state: {e}") + + return loaded_count + + def get_summary(self, meeting_id: Optional[str] = None) -> dict: + """ + Get a summary of chunk states. + + Args: + meeting_id: Optional meeting ID to filter by + + Returns: + Dictionary with state counts and details + """ + chunks = self._chunks.values() + if meeting_id: + chunks = [c for c in chunks if c.meeting_id == meeting_id] + + chunks_list = list(chunks) + + return { + "total": len(chunks_list), + "recorded": len([c for c in chunks_list if c.state == ChunkState.RECORDED]), + "sent": len([c for c in chunks_list if c.state == ChunkState.SENT]), + "transcribed": len([c for c in chunks_list if c.state == ChunkState.TRANSCRIBED]), + "failed": len([c for c in chunks_list if c.retry_count >= self.max_retries and c.state != ChunkState.TRANSCRIBED]) + } + + def clear_meeting(self, meeting_id: str) -> int: + """ + Clear all chunk state for a completed meeting. + + Only clears chunks that are TRANSCRIBED or permanently failed. + + Args: + meeting_id: Meeting ID to clear + + Returns: + Number of chunks cleared + """ + cleared_count = 0 + chunks_to_remove = [] + + for chunk_id, chunk in self._chunks.items(): + if chunk.meeting_id != meeting_id: + continue + + # Only clear terminal states + if chunk.state == ChunkState.TRANSCRIBED or chunk.retry_count >= self.max_retries: + chunks_to_remove.append(chunk_id) + + for chunk_id in chunks_to_remove: + try: + # Remove from memory + del self._chunks[chunk_id] + + # Remove from disk + state_file = self._get_chunk_state_file(chunk_id) + if state_file.exists(): + state_file.unlink() + + cleared_count += 1 + + except Exception as e: + logger.error(f"Failed to clear chunk {chunk_id}: {e}") + + if cleared_count > 0: + logger.info(f"Cleared {cleared_count} chunk(s) for meeting {meeting_id}") + + return cleared_count diff --git a/bot/main.py b/bot/main.py index d34dedc..82a7693 100644 --- a/bot/main.py +++ b/bot/main.py @@ -102,7 +102,7 @@ async def on_voice_state_update( text_channel = member.guild.get_channel(session.text_channel_id) if text_channel: await text_channel.send( - f"⚠️ **Meeting ended due to connection failure**\n" + f"**Meeting ended due to connection failure**\n" f"Meeting ID: `{session.meeting_id[:8]}`\n" f"The bot was disconnected and could not reconnect after {self.max_reconnect_attempts} attempts." ) @@ -153,7 +153,7 @@ async def _attempt_reconnect( text_channel = guild.get_channel(session.text_channel_id) if text_channel: await text_channel.send( - f"✅ **Reconnected successfully**\n" + f"**Reconnected successfully**\n" f"Meeting ID: `{session.meeting_id[:8]}`\n" f"Recording resumed after temporary disconnection." ) @@ -188,7 +188,7 @@ async def start_meeting(interaction: discord.Interaction): user = interaction.user if not isinstance(user, discord.Member) or not user.voice: await interaction.followup.send( - "❌ You must be in a voice channel to start a meeting.", + "Error: You must be in a voice channel to start a meeting.", ephemeral=True ) return @@ -197,7 +197,7 @@ async def start_meeting(interaction: discord.Interaction): if bot.meeting_manager.is_meeting_active(guild_id): existing = bot.meeting_manager.get_active_meeting(guild_id) await interaction.followup.send( - f"❌ A meeting is already in progress (ID: `{existing.meeting_id[:8]}`)\n" + f"Error: A meeting is already in progress (ID: `{existing.meeting_id[:8]}`)\n" f"Use `/end-meeting` to stop it first.", ephemeral=True ) @@ -212,7 +212,7 @@ async def start_meeting(interaction: discord.Interaction): ) if not success: - await interaction.followup.send(f"❌ Failed to start meeting: {error}", ephemeral=True) + await interaction.followup.send(f"Error: Failed to start meeting: {error}", ephemeral=True) return try: @@ -222,13 +222,13 @@ async def start_meeting(interaction: discord.Interaction): voice_client = interaction.guild.voice_client if not voice_client: bot.meeting_manager.end_meeting(guild_id) - await interaction.followup.send("❌ Failed to connect to voice channel", ephemeral=True) + await interaction.followup.send("Error: Failed to connect to voice channel", ephemeral=True) return except Exception as e: logger.error(f"Voice connection error: {e}") bot.meeting_manager.end_meeting(guild_id) await interaction.followup.send( - f"❌ Failed to connect to voice: {str(e)}", + f"Error: Failed to connect to voice: {str(e)}", ephemeral=True ) return @@ -244,16 +244,16 @@ async def start_meeting(interaction: discord.Interaction): await voice_client.disconnect(force=False) bot.meeting_manager.end_meeting(guild_id) await interaction.followup.send( - f"❌ Failed to start recording: {rec_error}", + f"Error: Failed to start recording: {rec_error}", ephemeral=True ) return await interaction.followup.send( - f"✅ **Meeting started!**\n" - f"📍 Voice Channel: {voice_channel.mention}\n" - f"🆔 Meeting ID: `{session.meeting_id[:8]}`\n" - f"🎙️ Recording in progress...\n\n" + f"**Meeting started successfully**\n" + f"Voice Channel: {voice_channel.mention}\n" + f"Meeting ID: `{session.meeting_id[:8]}`\n" + f"Recording in progress...\n\n" f"Use `/end-meeting` to stop recording." ) @@ -265,7 +265,7 @@ async def start_meeting(interaction: discord.Interaction): except Exception as e: logger.error(f"Unexpected error in start-meeting: {e}", exc_info=True) await interaction.followup.send( - "❌ An unexpected error occurred. Please try again.", + "Error: An unexpected error occurred. Please try again.", ephemeral=True ) @@ -277,7 +277,7 @@ async def end_meeting(interaction: discord.Interaction): guild_id = interaction.guild.id if not bot.meeting_manager.is_meeting_active(guild_id): await interaction.followup.send( - "❌ No active meeting to end.", + "Error: No active meeting to end.", ephemeral=True ) return @@ -288,7 +288,7 @@ async def end_meeting(interaction: discord.Interaction): if not voice_client: bot.meeting_manager.end_meeting(guild_id) await interaction.followup.send( - "⚠️ Meeting state cleaned up (bot was not in voice channel).", + "Warning: Meeting state cleaned up (bot was not in voice channel).", ephemeral=True ) return @@ -308,7 +308,7 @@ async def end_meeting(interaction: discord.Interaction): success, ended_session, error = bot.meeting_manager.end_meeting(guild_id) if not success: - await interaction.followup.send(f"⚠️ {error}", ephemeral=True) + await interaction.followup.send(f"Warning: {error}", ephemeral=True) return duration = ended_session.duration_seconds() @@ -316,14 +316,14 @@ async def end_meeting(interaction: discord.Interaction): seconds = int(duration % 60) response = ( - f"✅ **Meeting ended!**\n" - f"🆔 Meeting ID: `{ended_session.meeting_id[:8]}`\n" - f"⏱️ Duration: {minutes}m {seconds}s\n" + f"**Meeting ended successfully**\n" + f"Meeting ID: `{ended_session.meeting_id[:8]}`\n" + f"Duration: {minutes}m {seconds}s\n" ) if recording_info: - response += f"👥 Recorded {recording_info['user_count']} user(s)\n" - response += f"📁 Saved to: `{recording_info['output_dir']}`" + response += f"Recorded {recording_info['user_count']} user(s)\n" + response += f"Saved to: `{recording_info['output_dir']}`" await interaction.followup.send(response) @@ -343,7 +343,7 @@ async def end_meeting(interaction: discord.Interaction): pass await interaction.followup.send( - "⚠️ Meeting ended with errors. State has been cleaned up.", + "Warning: Meeting ended with errors. State has been cleaned up.", ephemeral=True ) diff --git a/bot/meeting_state.py b/bot/meeting_state.py index a47c737..d5192a0 100644 --- a/bot/meeting_state.py +++ b/bot/meeting_state.py @@ -1,6 +1,6 @@ from datetime import datetime from enum import Enum -from typing import Optional, Dict +from typing import Optional, Dict, List import uuid @@ -27,6 +27,16 @@ def __init__( self.end_timestamp: Optional[datetime] = None self.status = MeetingStatus.RECORDING self.recording_path: Optional[str] = None + self.chunk_ids: List[str] = [] # Track chunks for this meeting + + def add_chunk(self, chunk_id: str) -> None: + """Register a chunk ID with this meeting.""" + if chunk_id not in self.chunk_ids: + self.chunk_ids.append(chunk_id) + + def get_chunk_count(self) -> int: + """Get the number of chunks in this meeting.""" + return len(self.chunk_ids) def end(self) -> None: self.status = MeetingStatus.ENDED diff --git a/bot/recovery_manager.py b/bot/recovery_manager.py new file mode 100644 index 0000000..b55b15a --- /dev/null +++ b/bot/recovery_manager.py @@ -0,0 +1,294 @@ +""" +Crash recovery mechanism for the meeting summarizer bot. + +This module handles: +- Bot restart recovery +- Resuming incomplete meetings +- Processing chunks that failed before crash +- Safe state restoration +""" + +import logging +from pathlib import Path +from typing import List, Optional, Callable, Awaitable + +from chunk_state_manager import ChunkStateManager, ChunkState, ChunkMetadata + +logger = logging.getLogger(__name__) + + +class RecoveryManager: + """ + Manages recovery from crashes and restarts. + + On bot startup: + 1. Load all persisted chunk states + 2. Identify incomplete meetings + 3. Resume processing from last known state + """ + + def __init__(self, chunk_manager: ChunkStateManager): + self.chunk_manager = chunk_manager + + def recover_on_startup(self) -> dict: + """ + Perform recovery operations on bot startup. + + Returns: + Dictionary with recovery summary + """ + logger.info("Starting crash recovery process...") + + # Load all persisted chunk states + loaded_count = self.chunk_manager.load_state() + + if loaded_count == 0: + logger.info("No persisted state found. Starting fresh.") + return { + "loaded_chunks": 0, + "incomplete_meetings": [], + "pending_chunks": [], + "failed_chunks": [] + } + + # Analyze loaded state + summary = self._analyze_state() + + logger.info( + f"Recovery complete: {loaded_count} chunks loaded, " + f"{len(summary['incomplete_meetings'])} incomplete meeting(s), " + f"{len(summary['pending_chunks'])} chunk(s) pending processing" + ) + + return summary + + def _analyze_state(self) -> dict: + """Analyze loaded state and identify work to be done.""" + all_chunks = list(self.chunk_manager._chunks.values()) + + # Group chunks by meeting + meetings = {} + for chunk in all_chunks: + if chunk.meeting_id not in meetings: + meetings[chunk.meeting_id] = [] + meetings[chunk.meeting_id].append(chunk) + + incomplete_meetings = [] + pending_chunks = [] + failed_chunks = [] + + for meeting_id, chunks in meetings.items(): + # Check if meeting is complete + is_complete, status = self.chunk_manager.is_meeting_complete(meeting_id) + + if not is_complete: + incomplete_meetings.append({ + "meeting_id": meeting_id, + "status": status, + "chunk_count": len(chunks) + }) + + # Find chunks that need processing + for chunk in chunks: + if chunk.state != ChunkState.TRANSCRIBED: + if chunk.retry_count >= self.chunk_manager.max_retries: + failed_chunks.append(chunk.chunk_id) + else: + pending_chunks.append(chunk.chunk_id) + + return { + "loaded_chunks": len(all_chunks), + "incomplete_meetings": incomplete_meetings, + "pending_chunks": pending_chunks, + "failed_chunks": failed_chunks + } + + def get_pending_work(self, meeting_id: Optional[str] = None) -> List[ChunkMetadata]: + """ + Get all chunks that need processing. + + Returns chunks in RECORDED state that haven't exceeded max retries. + + Args: + meeting_id: Optional meeting ID to filter by + + Returns: + List of chunks ready for processing + """ + chunks = self.chunk_manager.get_chunks_by_state( + ChunkState.RECORDED, + meeting_id=meeting_id + ) + + # Filter out chunks that exceeded max retries + pending = [ + chunk for chunk in chunks + if chunk.retry_count < self.chunk_manager.max_retries + ] + + return pending + + def get_incomplete_meetings(self) -> dict: + """ + Get all meetings that are not fully complete. + + Returns: + Dictionary mapping meeting_id to completion status + """ + all_chunks = self.chunk_manager._chunks.values() + meetings = {} + + for chunk in all_chunks: + meeting_id = chunk.meeting_id + if meeting_id not in meetings: + is_complete, status = self.chunk_manager.is_meeting_complete(meeting_id) + if not is_complete: + meetings[meeting_id] = { + "status": status, + "summary": self.chunk_manager.get_summary(meeting_id) + } + + return meetings + + def log_recovery_report(self) -> None: + """Generate and log a detailed recovery report.""" + summary = self.chunk_manager.get_summary() + incomplete = self.get_incomplete_meetings() + + logger.info("=" * 60) + logger.info("RECOVERY REPORT") + logger.info("=" * 60) + logger.info(f"Total chunks: {summary['total']}") + logger.info(f" - Recorded: {summary['recorded']}") + logger.info(f" - Sent: {summary['sent']}") + logger.info(f" - Transcribed: {summary['transcribed']}") + logger.info(f" - Failed: {summary['failed']}") + logger.info("") + + if incomplete: + logger.info(f"Incomplete meetings: {len(incomplete)}") + for meeting_id, info in incomplete.items(): + logger.info(f" Meeting {meeting_id[:8]}:") + logger.info(f" Status: {info['status']}") + logger.info(f" Chunks: {info['summary']}") + else: + logger.info("No incomplete meetings found") + + logger.info("=" * 60) + + async def resume_processing( + self, + transcription_handler: Callable[[ChunkMetadata], Awaitable[bool]], + meeting_id: Optional[str] = None + ) -> dict: + """ + Resume processing of pending chunks. + + This is a helper method that can be called by the main bot + to automatically retry failed chunks. + + Args: + transcription_handler: Async function that handles transcription + Should return True on success, False on failure + meeting_id: Optional meeting ID to limit processing to + + Returns: + Dictionary with processing results + """ + pending = self.get_pending_work(meeting_id) + + if not pending: + logger.info("No pending chunks to process") + return { + "processed": 0, + "succeeded": 0, + "failed": 0 + } + + logger.info(f"Resuming processing for {len(pending)} pending chunk(s)") + + succeeded = 0 + failed = 0 + + for chunk in pending: + try: + logger.info( + f"Processing chunk {chunk.chunk_id} " + f"(attempt {chunk.retry_count + 1}/{self.chunk_manager.max_retries})" + ) + + # Mark as sent + self.chunk_manager.transition_chunk(chunk.chunk_id, ChunkState.SENT) + + # Call transcription handler + success = await transcription_handler(chunk) + + if success: + # Mark as transcribed + self.chunk_manager.transition_chunk( + chunk.chunk_id, + ChunkState.TRANSCRIBED + ) + succeeded += 1 + logger.info(f"Successfully processed chunk {chunk.chunk_id}") + else: + # Mark failed and check if should retry + should_retry = self.chunk_manager.mark_chunk_failed( + chunk.chunk_id, + "Transcription handler returned failure" + ) + failed += 1 + + if not should_retry: + logger.error( + f"Chunk {chunk.chunk_id} exceeded max retries. " + f"Manual intervention required." + ) + + except Exception as e: + logger.error(f"Error processing chunk {chunk.chunk_id}: {e}") + self.chunk_manager.mark_chunk_failed(chunk.chunk_id, str(e)) + failed += 1 + + logger.info( + f"Processing complete: {succeeded} succeeded, {failed} failed" + ) + + return { + "processed": len(pending), + "succeeded": succeeded, + "failed": failed + } + + def verify_chunk_files_exist(self) -> dict: + """ + Verify that all chunk files referenced in state actually exist on disk. + + Returns: + Dictionary with verification results + """ + all_chunks = list(self.chunk_manager._chunks.values()) + missing = [] + + for chunk in all_chunks: + file_path = Path(chunk.file_path) + if not file_path.exists(): + missing.append({ + "chunk_id": chunk.chunk_id, + "meeting_id": chunk.meeting_id, + "file_path": chunk.file_path, + "state": chunk.state.value + }) + logger.warning( + f"Chunk {chunk.chunk_id} references missing file: {chunk.file_path}" + ) + + if missing: + logger.warning(f"Found {len(missing)} chunk(s) with missing files") + else: + logger.info("All chunk files verified present") + + return { + "total_chunks": len(all_chunks), + "missing_files": missing + } diff --git a/docs/CHUNK_STATE_GUIDE.md b/docs/CHUNK_STATE_GUIDE.md new file mode 100644 index 0000000..d63ae90 --- /dev/null +++ b/docs/CHUNK_STATE_GUIDE.md @@ -0,0 +1,285 @@ +# Chunk State Manager - Quick Start Guide + +This guide shows how to integrate the chunk state manager into your component. + +## Basic Setup + +```python +from chunk_state_manager import ChunkStateManager, ChunkState + +# Initialize (usually done once at bot startup) +manager = ChunkStateManager(state_dir="state") + +# Load any persisted state (for crash recovery) +manager.load_state() +``` + +## Recording Component Integration + +When your recording component finishes writing a chunk to disk: + +```python +# Register the chunk +chunk = manager.register_chunk( + chunk_id="unique-chunk-id", # Generate unique ID + meeting_id="meeting-session-id", # Current meeting ID + file_path="/path/to/chunk.wav" # Path to audio file +) + +# Chunk is now in RECORDED state and persisted to disk +``` + +## Transcription Component Integration + +When processing chunks for transcription: + +```python +# Get all chunks ready for processing +pending_chunks = manager.get_chunks_by_state( + ChunkState.RECORDED, + meeting_id="meeting-id" # Optional: filter by meeting +) + +for chunk in pending_chunks: + # Skip if too many retries + if chunk.retry_count >= manager.max_retries: + continue + + # Apply retry delay if this is a retry + if chunk.retry_count > 0: + delay = manager.get_retry_delay(chunk.retry_count) + await asyncio.sleep(delay) + + # Mark as sent before calling API + manager.transition_chunk(chunk.chunk_id, ChunkState.SENT) + + try: + # Call your transcription API + result = await your_transcription_api(chunk.file_path) + + # Store result + manager.set_transcription_result(chunk.chunk_id, result) + + # Mark as transcribed + manager.transition_chunk(chunk.chunk_id, ChunkState.TRANSCRIBED) + + except Exception as e: + # Mark failed (automatically transitions back to RECORDED if should retry) + should_retry = manager.mark_chunk_failed(chunk.chunk_id, str(e)) + + if not should_retry: + # Exceeded max retries - log for manual intervention + logger.error(f"Chunk {chunk.chunk_id} permanently failed") +``` + +## Bot Startup Integration + +Add this to your bot's startup sequence: + +```python +from recovery_manager import RecoveryManager + +# Initialize managers +chunk_manager = ChunkStateManager() +recovery = RecoveryManager(chunk_manager) + +# Perform crash recovery +recovery_summary = recovery.recover_on_startup() + +# Log recovery report +recovery.log_recovery_report() + +# Get pending work +pending = recovery.get_pending_work() +if pending: + logger.info(f"Found {len(pending)} pending chunks to process") + # Schedule processing... +``` + +## Checking Meeting Completion + +Before generating a summary: + +```python +# Check if all chunks are transcribed +is_complete, status = manager.is_meeting_complete(meeting_id) + +if is_complete: + # Safe to generate summary + chunks = manager.get_chunks_by_meeting(meeting_id) + transcripts = [c.transcription_result for c in chunks] + # Generate summary... + + # Clean up state after summary is posted + manager.clear_meeting(meeting_id) +else: + # Not ready yet + logger.warning(f"Meeting not complete: {status}") + + # Check for permanently failed chunks + failed = manager.get_failed_chunks(meeting_id) + if failed: + # Alert: manual intervention needed + logger.error(f"{len(failed)} chunks permanently failed") +``` + +## Error Handling Patterns + +### Pattern 1: Simple Retry + +```python +async def process_with_retry(chunk_id: str) -> bool: + chunk = manager.get_chunk(chunk_id) + + manager.transition_chunk(chunk_id, ChunkState.SENT) + + try: + result = await transcribe(chunk.file_path) + manager.set_transcription_result(chunk_id, result) + manager.transition_chunk(chunk_id, ChunkState.TRANSCRIBED) + return True + except Exception as e: + should_retry = manager.mark_chunk_failed(chunk_id, str(e)) + return False +``` + +### Pattern 2: Batch Processing with Retry + +```python +async def process_meeting(meeting_id: str): + while True: + pending = manager.get_chunks_by_state( + ChunkState.RECORDED, + meeting_id=meeting_id + ) + + # Filter out permanently failed + pending = [c for c in pending if c.retry_count < manager.max_retries] + + if not pending: + break + + for chunk in pending: + # Apply backoff + if chunk.retry_count > 0: + delay = manager.get_retry_delay(chunk.retry_count) + await asyncio.sleep(delay) + + await process_with_retry(chunk.chunk_id) +``` + +### Pattern 3: With Progress Tracking + +```python +async def process_with_progress(meeting_id: str): + summary = manager.get_summary(meeting_id) + total = summary['total'] + + while True: + summary = manager.get_summary(meeting_id) + completed = summary['transcribed'] + failed = summary['failed'] + + logger.info(f"Progress: {completed}/{total} completed, {failed} failed") + + pending = manager.get_chunks_by_state( + ChunkState.RECORDED, + meeting_id=meeting_id + ) + pending = [c for c in pending if c.retry_count < manager.max_retries] + + if not pending: + break + + for chunk in pending: + await process_with_retry(chunk.chunk_id) +``` + +## Configuration + +Adjust retry behavior in your code: + +```python +manager = ChunkStateManager() + +# Customize retry settings +manager.max_retries = 3 # Default: 5 +manager.retry_base_delay = 5 # Default: 2 seconds +manager.use_exponential_backoff = True # Default: True + +# Exponential backoff delays: +# Retry 1: 5s +# Retry 2: 10s +# Retry 3: 20s +``` + +## Common Operations + +### Get Summary +```python +summary = manager.get_summary(meeting_id) +# Returns: {'total': 10, 'recorded': 2, 'sent': 1, 'transcribed': 7, 'failed': 0} +``` + +### Get Failed Chunks +```python +failed = manager.get_failed_chunks(meeting_id) +for chunk in failed: + print(f"{chunk.chunk_id}: {chunk.last_error}") +``` + +### Clear Completed Meeting +```python +# Only clears chunks in terminal states (TRANSCRIBED or permanently failed) +cleared = manager.clear_meeting(meeting_id) +print(f"Cleared {cleared} chunks") +``` + +### Manual State Inspection +```python +chunk = manager.get_chunk(chunk_id) +print(f"State: {chunk.state.value}") +print(f"Retries: {chunk.retry_count}") +print(f"Last error: {chunk.last_error}") +print(f"Created: {chunk.created_at}") +print(f"Updated: {chunk.updated_at}") +``` + +## Testing Your Integration + +Use the test script: + +```bash +python examples/test_chunk_state.py +``` + +Or the integration example: + +```bash +python examples/integration_example.py +``` + +## Troubleshooting + +**Chunks not persisting:** +- Check that `state_dir` exists with write permissions +- Verify `_persist_chunk()` is being called (check logs) + +**Invalid state transition errors:** +- Review valid transitions in `ChunkMetadata.transition_to()` +- Use the transition methods, don't set state directly + +**Chunks stuck in SENT:** +- Check if process crashed before transition to TRANSCRIBED +- Recovery manager will reset to RECORDED on restart + +**Meeting never completes:** +- Run `get_summary()` to see chunk states +- Check for failed chunks with `get_failed_chunks()` +- Verify all chunks were registered + +## Next Steps + +See [CONTRIBUTING.md](../CONTRIBUTING.md) for detailed developer guide. + +See [README.md](../README.md) for full system documentation. diff --git a/examples/integration_example.py b/examples/integration_example.py new file mode 100644 index 0000000..979b8ea --- /dev/null +++ b/examples/integration_example.py @@ -0,0 +1,259 @@ +""" +Example integration: Chunk state manager with mock transcription. + +This example shows how to integrate the chunk state system with +a transcription handler (simulated with random failures). + +Demonstrates: +- Registering chunks as they're recorded +- Processing chunks with retry on failure +- Handling permanent failures +- Checking meeting completion + +Run with: python examples/integration_example.py +""" + +import sys +import asyncio +import logging +import random +from pathlib import Path + +# Add bot directory to path +sys.path.insert(0, str(Path(__file__).parent.parent / "bot")) + +from chunk_state_manager import ChunkStateManager, ChunkState +from recovery_manager import RecoveryManager + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class MockTranscriptionAPI: + """Simulates a transcription API with random failures.""" + + def __init__(self, failure_rate: float = 0.4): + self.failure_rate = failure_rate + self.call_count = 0 + + async def transcribe(self, audio_path: str) -> str: + """Simulate transcription with random failures.""" + self.call_count += 1 + + # Simulate API latency + await asyncio.sleep(0.1) + + # Random failure + if random.random() < self.failure_rate: + raise Exception("API timeout or rate limit") + + # Success + return f"Transcribed text from {audio_path}" + + +async def process_chunk( + chunk_id: str, + manager: ChunkStateManager, + api: MockTranscriptionAPI +) -> bool: + """ + Process a single chunk through transcription. + + Returns: + bool: True on success, False on failure + """ + chunk = manager.get_chunk(chunk_id) + if not chunk: + logger.error(f"Chunk {chunk_id} not found") + return False + + # Mark as sent + manager.transition_chunk(chunk_id, ChunkState.SENT) + + try: + # Call transcription API + result = await api.transcribe(chunk.file_path) + + # Store result and mark as transcribed + manager.set_transcription_result(chunk_id, result) + manager.transition_chunk(chunk_id, ChunkState.TRANSCRIBED) + + logger.info(f"Successfully transcribed chunk {chunk_id}") + return True + + except Exception as e: + # Mark failed + should_retry = manager.mark_chunk_failed(chunk_id, str(e)) + + if not should_retry: + logger.error( + f"Chunk {chunk_id} permanently failed after " + f"{manager.max_retries} attempts" + ) + + return False + + +async def process_meeting_with_retry( + meeting_id: str, + manager: ChunkStateManager, + api: MockTranscriptionAPI +) -> dict: + """ + Process all chunks for a meeting with automatic retry. + + Returns: + Dictionary with processing statistics + """ + logger.info(f"Starting processing for meeting {meeting_id}") + + stats = { + "total_chunks": 0, + "succeeded": 0, + "failed": 0, + "attempts": 0 + } + + while True: + # Get pending chunks + pending = manager.get_chunks_by_state( + ChunkState.RECORDED, + meeting_id=meeting_id + ) + + # Filter out permanently failed chunks + pending = [ + c for c in pending + if c.retry_count < manager.max_retries + ] + + if not pending: + break + + logger.info(f"Processing {len(pending)} pending chunk(s)") + + for chunk in pending: + stats["attempts"] += 1 + + # Apply retry delay if this is a retry + if chunk.retry_count > 0: + delay = manager.get_retry_delay(chunk.retry_count) + logger.info( + f"Retrying chunk {chunk.chunk_id} after {delay}s " + f"(attempt {chunk.retry_count + 1}/{manager.max_retries})" + ) + await asyncio.sleep(delay) + + # Process chunk + success = await process_chunk(chunk.chunk_id, manager, api) + + if success: + stats["succeeded"] += 1 + + # Count permanently failed + failed = manager.get_failed_chunks(meeting_id) + stats["failed"] = len(failed) + stats["total_chunks"] = stats["succeeded"] + stats["failed"] + + # Check completion + is_complete, status = manager.is_meeting_complete(meeting_id) + + logger.info(f"Processing complete for meeting {meeting_id}") + logger.info(f" Status: {status}") + logger.info(f" Total attempts: {stats['attempts']}") + logger.info(f" API calls: {api.call_count}") + + return stats + + +async def main(): + """Run integration example.""" + print("\n" + "="*60) + print("INTEGRATION EXAMPLE: Chunk State + Mock Transcription") + print("="*60) + + # Initialize components + manager = ChunkStateManager(state_dir="example_state") + api = MockTranscriptionAPI(failure_rate=0.3) # 30% failure rate + + meeting_id = "example-meeting" + + print("\n1. Simulating audio recording (creating chunks)...") + + # Simulate recording creating chunks + chunk_ids = [] + for i in range(5): + chunk_id = f"chunk-{i:03d}" + manager.register_chunk( + chunk_id=chunk_id, + meeting_id=meeting_id, + file_path=f"recordings/{meeting_id}/{chunk_id}.wav" + ) + chunk_ids.append(chunk_id) + + print(f"Created {len(chunk_ids)} chunks") + + print("\n2. Processing chunks with automatic retry...") + + # Process all chunks + stats = await process_meeting_with_retry(meeting_id, manager, api) + + print("\n3. Results:") + print(f" Total chunks: {stats['total_chunks']}") + print(f" Succeeded: {stats['succeeded']}") + print(f" Failed: {stats['failed']}") + print(f" Total attempts: {stats['attempts']}") + print(f" API calls: {api.call_count}") + + # Show summary + summary = manager.get_summary(meeting_id) + print(f"\n4. Final state summary:") + print(f" Transcribed: {summary['transcribed']}") + print(f" Failed: {summary['failed']}") + + # Check completion + is_complete, status = manager.is_meeting_complete(meeting_id) + print(f"\n5. Meeting completion:") + print(f" Complete: {is_complete}") + print(f" Status: {status}") + + # Show failed chunks if any + failed = manager.get_failed_chunks(meeting_id) + if failed: + print(f"\n6. Permanently failed chunks (require manual intervention):") + for chunk in failed: + print(f" - {chunk.chunk_id}: {chunk.last_error}") + print(f" Retry count: {chunk.retry_count}") + + # Simulate crash recovery + print("\n7. Simulating bot restart (crash recovery)...") + new_manager = ChunkStateManager(state_dir="example_state") + recovery = RecoveryManager(new_manager) + + recovery_summary = recovery.recover_on_startup() + print(f" Loaded chunks: {recovery_summary['loaded_chunks']}") + print(f" Incomplete meetings: {len(recovery_summary['incomplete_meetings'])}") + print(f" Pending chunks: {len(recovery_summary['pending_chunks'])}") + + # Cleanup + print("\n8. Cleaning up...") + import shutil + shutil.rmtree("example_state") + print(" Removed example_state directory") + + print("\n" + "="*60) + print("EXAMPLE COMPLETE") + print("="*60) + print("\nKey takeaways:") + print(" - Chunks are registered as RECORDED") + print(" - Failed chunks automatically retry with backoff") + print(" - Permanent failures are tracked separately") + print(" - State survives crashes and is recovered on restart") + print(" - Meeting completion requires ALL chunks transcribed") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/test_chunk_state.py b/examples/test_chunk_state.py new file mode 100644 index 0000000..5ec3f61 --- /dev/null +++ b/examples/test_chunk_state.py @@ -0,0 +1,269 @@ +""" +Test script demonstrating chunk state tracking and retry behavior. + +This standalone script shows: +- Chunk registration +- State transitions +- Retry handling +- Persistence and recovery +- Meeting completion validation + +Run with: python examples/test_chunk_state.py +""" + +import sys +import logging +from pathlib import Path + +# Add bot directory to path +sys.path.insert(0, str(Path(__file__).parent.parent / "bot")) + +from chunk_state_manager import ChunkStateManager, ChunkState +from recovery_manager import RecoveryManager + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def test_chunk_registration(): + """Test basic chunk registration and state tracking.""" + print("\n" + "="*60) + print("TEST 1: Chunk Registration") + print("="*60) + + manager = ChunkStateManager(state_dir="test_state") + + # Register some chunks + chunk1 = manager.register_chunk( + chunk_id="chunk-001", + meeting_id="meeting-alpha", + file_path="recordings/meeting-alpha/chunk-001.wav" + ) + + chunk2 = manager.register_chunk( + chunk_id="chunk-002", + meeting_id="meeting-alpha", + file_path="recordings/meeting-alpha/chunk-002.wav" + ) + + chunk3 = manager.register_chunk( + chunk_id="chunk-003", + meeting_id="meeting-beta", + file_path="recordings/meeting-beta/chunk-003.wav" + ) + + print(f"\nRegistered 3 chunks:") + print(f" - {chunk1}") + print(f" - {chunk2}") + print(f" - {chunk3}") + + # Get chunks by meeting + alpha_chunks = manager.get_chunks_by_meeting("meeting-alpha") + print(f"\nChunks in meeting-alpha: {len(alpha_chunks)}") + + # Get summary + summary = manager.get_summary("meeting-alpha") + print(f"Summary: {summary}") + + return manager + + +def test_state_transitions(manager: ChunkStateManager): + """Test valid and invalid state transitions.""" + print("\n" + "="*60) + print("TEST 2: State Transitions") + print("="*60) + + chunk_id = "chunk-001" + + # Valid transition: RECORDED -> SENT + print(f"\nAttempting RECORDED -> SENT...") + success = manager.transition_chunk(chunk_id, ChunkState.SENT) + print(f"Result: {'Success' if success else 'Failed'}") + + # Valid transition: SENT -> TRANSCRIBED + print(f"\nAttempting SENT -> TRANSCRIBED...") + success = manager.transition_chunk(chunk_id, ChunkState.TRANSCRIBED) + print(f"Result: {'Success' if success else 'Failed'}") + + # Invalid transition: TRANSCRIBED -> RECORDED (terminal state) + print(f"\nAttempting TRANSCRIBED -> RECORDED (should fail)...") + success = manager.transition_chunk(chunk_id, ChunkState.RECORDED) + print(f"Result: {'Success' if success else 'Failed (expected)'}") + + chunk = manager.get_chunk(chunk_id) + print(f"\nFinal state: {chunk.state.value}") + + +def test_retry_handling(manager: ChunkStateManager): + """Test retry mechanism with failures.""" + print("\n" + "="*60) + print("TEST 3: Retry Handling") + print("="*60) + + chunk_id = "chunk-002" + + # Transition to SENT + manager.transition_chunk(chunk_id, ChunkState.SENT) + + # Simulate multiple failures + for attempt in range(1, 7): + print(f"\nSimulating failure attempt {attempt}...") + should_retry = manager.mark_chunk_failed( + chunk_id, + f"Simulated API error on attempt {attempt}" + ) + + chunk = manager.get_chunk(chunk_id) + print(f" Retry count: {chunk.retry_count}") + print(f" Current state: {chunk.state.value}") + print(f" Should retry: {should_retry}") + + if should_retry: + # Retry: transition back to SENT + manager.transition_chunk(chunk_id, ChunkState.SENT) + else: + print(f" MAX RETRIES EXCEEDED - Manual intervention required") + break + + # Show final state + chunk = manager.get_chunk(chunk_id) + print(f"\nFinal retry count: {chunk.retry_count}") + print(f"Last error: {chunk.last_error}") + + +def test_meeting_completion(manager: ChunkStateManager): + """Test meeting completion validation.""" + print("\n" + "="*60) + print("TEST 4: Meeting Completion") + print("="*60) + + meeting_id = "meeting-alpha" + + # Check completion (should be incomplete) + is_complete, status = manager.is_meeting_complete(meeting_id) + print(f"\nMeeting {meeting_id}:") + print(f" Complete: {is_complete}") + print(f" Status: {status}") + + # Complete the incomplete chunk + chunk_id = "chunk-002" + chunk = manager.get_chunk(chunk_id) + + if chunk.state != ChunkState.TRANSCRIBED: + print(f"\nCompleting chunk {chunk_id}...") + # Reset retry count for demo + chunk.retry_count = 0 + chunk.state = ChunkState.RECORDED + manager.transition_chunk(chunk_id, ChunkState.SENT) + manager.transition_chunk(chunk_id, ChunkState.TRANSCRIBED) + + # Check again + is_complete, status = manager.is_meeting_complete(meeting_id) + print(f"\nMeeting {meeting_id} after completion:") + print(f" Complete: {is_complete}") + print(f" Status: {status}") + + # Show summary + summary = manager.get_summary(meeting_id) + print(f" Summary: {summary}") + + +def test_persistence_and_recovery(): + """Test persistence and crash recovery.""" + print("\n" + "="*60) + print("TEST 5: Persistence and Recovery") + print("="*60) + + # Create new manager to simulate restart + print("\nSimulating bot restart...") + new_manager = ChunkStateManager(state_dir="test_state") + + # Load persisted state + loaded_count = new_manager.load_state() + print(f"Loaded {loaded_count} chunks from disk") + + # Show what was recovered + summary = new_manager.get_summary() + print(f"\nRecovered state summary:") + print(f" Total: {summary['total']}") + print(f" Recorded: {summary['recorded']}") + print(f" Sent: {summary['sent']}") + print(f" Transcribed: {summary['transcribed']}") + print(f" Failed: {summary['failed']}") + + # Use recovery manager + print("\nUsing RecoveryManager...") + recovery = RecoveryManager(new_manager) + recovery_summary = recovery.recover_on_startup() + + print(f"\nRecovery summary:") + print(f" Loaded chunks: {recovery_summary['loaded_chunks']}") + print(f" Incomplete meetings: {len(recovery_summary['incomplete_meetings'])}") + print(f" Pending chunks: {len(recovery_summary['pending_chunks'])}") + print(f" Failed chunks: {len(recovery_summary['failed_chunks'])}") + + if recovery_summary['incomplete_meetings']: + print("\nIncomplete meetings:") + for meeting in recovery_summary['incomplete_meetings']: + print(f" - Meeting {meeting['meeting_id']}: {meeting['status']}") + + # Show pending work + pending = recovery.get_pending_work() + print(f"\nPending chunks ready for processing: {len(pending)}") + for chunk in pending: + print(f" - {chunk.chunk_id} (retry {chunk.retry_count})") + + +def test_cleanup(): + """Clean up test state.""" + print("\n" + "="*60) + print("CLEANUP") + print("="*60) + + import shutil + state_dir = Path("test_state") + + if state_dir.exists(): + shutil.rmtree(state_dir) + print("\nRemoved test_state directory") + + print("\nTest complete!") + + +def main(): + """Run all tests.""" + print("\n") + print("="*60) + print("CHUNK STATE TRACKING AND RETRY TEST SUITE") + print("="*60) + print("\nThis script demonstrates:") + print(" 1. Chunk registration") + print(" 2. State transitions") + print(" 3. Retry handling with exponential backoff") + print(" 4. Meeting completion validation") + print(" 5. Persistence and crash recovery") + + try: + # Run tests in sequence + manager = test_chunk_registration() + test_state_transitions(manager) + test_retry_handling(manager) + test_meeting_completion(manager) + test_persistence_and_recovery() + + # Cleanup + test_cleanup() + + except Exception as e: + logger.error(f"Test failed: {e}", exc_info=True) + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main())