Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions ragitect/api/v1/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,31 @@ async def process_document_background(document_id: UUID) -> None:
# Error already logged and status updated in service


async def fetch_and_process_url_background(
document_id: UUID,
url: str,
source_type: str,
) -> None:
"""Background task wrapper for URL document processing

Args:
document_id: Document UUID to process
url: Source URL to fetch content from
source_type: Type of URL source ("url", "youtube", "pdf")
"""
from ragitect.services.database.connection import get_session_factory
from ragitect.services.url_processing_service import URLProcessingService

session_factory = get_session_factory()
async with session_factory() as session:
try:
processing_service = URLProcessingService(session)
await processing_service.process_url_document(document_id, url, source_type)
except Exception as e:
logger.error(f"Background URL processing failed for {document_id}: {e}")
# Error already logged and status updated in service


@router.post(
"/{workspace_id}/documents",
response_model=DocumentListResponse,
Expand Down Expand Up @@ -185,13 +210,15 @@ async def upload_documents(
async def upload_url(
workspace_id: UUID,
input_data: URLUploadInput,
background_tasks: BackgroundTasks = BackgroundTasks(),
session: AsyncSession = Depends(get_async_session),
) -> URLUploadResponse:
"""Submit URL for document ingestion

Args:
workspace_id: Target workspace UUID
input_data: URL upload input with source_type and url
background_tasks: FastAPI background tasks for async processing
session: Database session (injected by FastAPI)

Returns:
Expand Down Expand Up @@ -271,8 +298,14 @@ async def upload_url(
safe_log_url,
)

# NOTE: Background processing NOT triggered here (Story 5.5)
# Document will remain in "backlog" status until background task picks it up
# Trigger background URL processing
background_tasks.add_task(
fetch_and_process_url_background,
document_id=document.id,
url=sanitized_url,
source_type=source_type,
)
logger.info(f"Scheduled background URL processing for document {document.id}")

return URLUploadResponse(
id=str(document.id),
Expand Down
70 changes: 55 additions & 15 deletions ragitect/services/processor/factory.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
"""
Document Processor Factory

Selects the appropriate document processor based on file type.
Uses a simple two-processor strategy:
Selects the appropriate document processor based on file type or URL source type.
Uses a simple two-processor strategy for files:
- SimpleProcessor for text-based formats (lightweight, fast)
- DoclingProcessor for complex formats (PDF, DOCX, etc.)

For URL sources (Story 5.5):
- WebURLProcessor for web pages
- YouTubeProcessor for YouTube videos
- PDFURLProcessor for PDF URLs
"""

import logging
from pathlib import Path
from typing import Literal

from ragitect.services.processor.base import BaseDocumentProcessor
from ragitect.services.processor.simple import SimpleProcessor
from ragitect.services.processor.docling_processor import DoclingProcessor
from pathlib import Path
import logging
from ragitect.services.processor.pdf_url_processor import PDFURLProcessor
from ragitect.services.processor.simple import SimpleProcessor
from ragitect.services.processor.web_url_processor import WebURLProcessor
from ragitect.services.processor.youtube_processor import YouTubeProcessor

logger = logging.getLogger(__name__)

Expand All @@ -28,40 +38,70 @@ class UnsupportedFormatError(Exception):


class ProcessorFactory:
"""Factory for selecting appropriate document processor based on filetype"""
"""Factory for selecting appropriate document processor based on filetype or URL source type"""

def __init__(self):
logger.info("Initializing ProcessorFactory")
# File-based processors
self.simple_processor: BaseDocumentProcessor = SimpleProcessor()
self.docling_processor: BaseDocumentProcessor = DoclingProcessor()

# URL-based processors (Story 5.5)
self.web_url_processor: WebURLProcessor = WebURLProcessor()
self.youtube_processor: YouTubeProcessor = YouTubeProcessor()
self.pdf_url_processor: PDFURLProcessor = PDFURLProcessor()

self.text_formats: list[str] = self.simple_processor.supported_formats()
self.complex_formats: list[str] = self.docling_processor.supported_formats()

logger.info(
f"ProcessorFactory initialized - Text formats: {len(self.text_formats)}, Complex formats: {len(self.complex_formats)}"
)

def get_processor(self, file_name: str) -> BaseDocumentProcessor:
"""Select appropriate processor for given file
def get_processor(
self,
source: str,
source_type: Literal["file", "url", "youtube", "pdf"] = "file",
) -> BaseDocumentProcessor:
"""Select appropriate processor for given source

Args:
file_name: name of the uploaded file
source: File name (for files) or URL (for URLs)
source_type: Type of source - "file", "url", "youtube", or "pdf"
- "file" (default): Route by file extension
- "url": Use WebURLProcessor for web pages
- "youtube": Use YouTubeProcessor for YouTube videos
- "pdf": Use PDFURLProcessor for PDF URLs

Returns:
BaseDocumentProcessor: either SimpleProcessor or DoclingProcessor instance
BaseDocumentProcessor: Appropriate processor for the source type

Raises:
UnsupportedFormatError: if the file extension is not supported by any processor
UnsupportedFormatError: If source_type is "file" and the file extension
is not supported by any processor
"""
ext = Path(file_name).suffix.lower()
# URL-based routing (Story 5.5)
if source_type == "url":
logger.info(f"Selected WebURLProcessor for URL: {source[:50]}...")
return self.web_url_processor

if source_type == "youtube":
logger.info(f"Selected YouTubeProcessor for YouTube URL: {source[:50]}...")
return self.youtube_processor

if source_type == "pdf":
logger.info(f"Selected PDFURLProcessor for PDF URL: {source[:50]}...")
return self.pdf_url_processor

# Existing file-based routing (backward compatible)
ext = Path(source).suffix.lower()

if ext in self.text_formats:
logger.info(f"Selected SimpleProcessor for file {file_name}")
logger.info(f"Selected SimpleProcessor for file {source}")
return self.simple_processor

if ext in self.complex_formats:
logger.info(f"Selected DoclingProcessor for file {file_name}")
logger.info(f"Selected DoclingProcessor for file {source}")
return self.docling_processor

all_formats = sorted(self.text_formats + self.complex_formats)
Expand All @@ -70,5 +110,5 @@ def get_processor(self, file_name: str) -> BaseDocumentProcessor:
f"Unsupported format: {ext}\nSupported formats: {', '.join(all_formats)}"
)

logger.warning(f"Unsupported file format attempted: {ext} (file: {file_name})")
logger.warning(f"Unsupported file format attempted: {ext} (file: {source})")
raise UnsupportedFormatError(error_msg)
119 changes: 119 additions & 0 deletions ragitect/services/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Retry utility with exponential backoff for URL fetching

Implements AC4 (NFR-R3) retry logic:
- Retry up to 3 times on transient failures
- Exponential backoff delays: 1s, 2s, 4s
- Retryable errors: timeout, network errors, HTTP 5xx
- Non-retryable errors: HTTP 4xx (immediate fail)

Usage:
>>> from ragitect.services.retry import with_retry
>>> result = await with_retry(fetch_url, "https://example.com")
"""

import asyncio
import logging
from typing import Any, Callable

import httpx

logger = logging.getLogger(__name__)

# Retry configuration constants (AC4)
MAX_RETRIES = 3
INITIAL_DELAY = 1.0 # seconds
BACKOFF_MULTIPLIER = 2

# Retryable exceptions (transient network issues)
RETRYABLE_EXCEPTIONS = (
httpx.TimeoutException,
httpx.NetworkError,
httpx.ConnectError,
httpx.RemoteProtocolError,
)

# HTTP status codes that should trigger retry
RETRYABLE_STATUS_CODES = {500, 502, 503, 504}


async def with_retry(func: Callable, *args: Any, **kwargs: Any) -> Any:
"""Execute async function with exponential backoff retry.

Retries on:
- Timeout exceptions (httpx.TimeoutException)
- Network errors (httpx.NetworkError, httpx.ConnectError)
- HTTP 5xx server errors (500, 502, 503, 504)

Does NOT retry on:
- HTTP 4xx client errors (400, 401, 403, 404)
- Content extraction errors
- Other application errors

Args:
func: Async function to call
*args: Positional arguments for func
**kwargs: Keyword arguments for func

Returns:
Result from func if successful

Raises:
Exception: The last exception if all retries exhausted,
or immediately for non-retryable errors

Example:
>>> async def fetch_data(url: str) -> str:
... async with httpx.AsyncClient() as client:
... response = await client.get(url)
... return response.text
>>> result = await with_retry(fetch_data, "https://example.com")
"""
delay = INITIAL_DELAY
last_exception: Exception | None = None

for attempt in range(MAX_RETRIES):
try:
return await func(*args, **kwargs)

except RETRYABLE_EXCEPTIONS as e:
last_exception = e
if attempt < MAX_RETRIES - 1:
logger.warning(
f"Retry {attempt + 1}/{MAX_RETRIES} after {delay:.1f}s: "
f"{type(e).__name__}: {e}"
)
await asyncio.sleep(delay)
delay *= BACKOFF_MULTIPLIER
else:
logger.error(f"All {MAX_RETRIES} retries exhausted: {e}")
raise

except httpx.HTTPStatusError as e:
# Only retry on 5xx server errors
if e.response.status_code in RETRYABLE_STATUS_CODES:
last_exception = e
if attempt < MAX_RETRIES - 1:
logger.warning(
f"Retry {attempt + 1}/{MAX_RETRIES} after {delay:.1f}s: "
f"HTTP {e.response.status_code}"
)
await asyncio.sleep(delay)
delay *= BACKOFF_MULTIPLIER
else:
logger.error(
f"All {MAX_RETRIES} retries exhausted: "
f"HTTP {e.response.status_code}"
)
raise
else:
# 4xx errors - don't retry, fail immediately
logger.warning(
f"Non-retryable HTTP error: {e.response.status_code}"
)
raise

# Should not reach here, but just in case
if last_exception:
raise last_exception

raise RuntimeError("Unexpected state in retry logic")
Loading