Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions ragitect/api/schemas/document_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Document input API schemas for URL-based ingestion.

Pydantic models for document input requests supporting both file uploads
and URL-based ingestion with discriminated union pattern.
"""

from typing import Annotated, Literal

from pydantic import AnyUrl, BaseModel, ConfigDict, Field, UrlConstraints
from pydantic.alias_generators import to_camel


# Custom URL type with explicit constraints.
# NOTE: We intentionally do NOT constrain allowed schemes here so the API can
# return a 400 with the required error message for non-HTTP(S) schemes (AC2).
SafeIngestUrl = Annotated[
AnyUrl,
UrlConstraints(
max_length=2000,
host_required=False,
),
]


class URLUploadInput(BaseModel):
"""Schema for URL-based document upload input.

Used for submitting URLs for document ingestion (web pages, YouTube, PDFs).
source_type determines the processing strategy.

Attributes:
source_type: Type of URL source - "url" (web page), "youtube", or "pdf"
url: The HTTP/HTTPS URL to ingest

Example:
```json
{
"sourceType": "url",
"url": "https://example.com/article"
}
```

Security Notes:
- Only HTTP and HTTPS URLs are allowed
- Private IPs (10.x.x.x, 172.16.x.x, 192.168.x.x) are blocked
- Localhost addresses are blocked
- Cloud metadata endpoints (169.254.x.x) are blocked
"""

model_config = ConfigDict(
populate_by_name=True,
alias_generator=to_camel,
json_schema_extra={
"examples": [
{"sourceType": "url", "url": "https://example.com/article"},
{
"sourceType": "youtube",
"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
},
{"sourceType": "pdf", "url": "https://arxiv.org/pdf/2301.00001.pdf"},
]
},
)

source_type: Literal["url", "youtube", "pdf"] = Field(
...,
description="Type of URL source: 'url' for web pages, 'youtube' for videos, 'pdf' for PDF files",
)
url: SafeIngestUrl = Field(
...,
description="The HTTP/HTTPS URL to ingest",
)


class URLUploadResponse(BaseModel):
"""Schema for URL upload response.

Same structure as DocumentUploadResponse but with URL-specific metadata.

Attributes:
id: Unique document identifier (UUID)
source_type: Type of URL source
source_url: The submitted URL
status: Processing status (backlog = queued for fetching)
message: Human-readable status message

Example:
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"sourceType": "url",
"sourceUrl": "https://example.com/article",
"status": "backlog",
"message": "URL submitted for ingestion. Processing will begin shortly."
}
```
"""

model_config = ConfigDict(
populate_by_name=True,
alias_generator=to_camel,
json_schema_extra={
"example": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"sourceType": "url",
"sourceUrl": "https://example.com/article",
"status": "backlog",
"message": "URL submitted for ingestion. Processing will begin shortly.",
}
},
)

id: str = Field(..., description="Unique document identifier (UUID)")
source_type: Literal["url", "youtube", "pdf"] = Field(
..., description="Type of URL source"
)
source_url: str = Field(..., description="The submitted URL")
status: str = Field(
default="backlog",
description="Document status: 'backlog' means queued for fetching",
)
message: str = Field(
default="URL submitted for ingestion",
description="Human-readable status message",
)
136 changes: 134 additions & 2 deletions ragitect/api/v1/documents.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Document API endpoints

Provides REST API endpoints for document operations:
- POST /api/v1/workspaces/{workspace_id}/documents - Upload documents
- POST /api/v1/workspaces/{workspace_id}/documents - Upload documents (file)
- POST /api/v1/workspaces/{workspace_id}/documents/upload-url - Upload documents (URL)
- GET /api/v1/workspaces/{workspace_id}/documents - List documents
- GET /api/v1/documents/{document_id} - Get document detail
- DELETE /api/v1/documents/{document_id} - Delete document
"""

import logging
from urllib.parse import urlsplit, urlunsplit
from uuid import UUID

from fastapi import (
Expand All @@ -27,14 +29,20 @@
DocumentStatusResponse,
DocumentUploadResponse,
)
from ragitect.api.schemas.document_input import URLUploadInput, URLUploadResponse
from ragitect.services.database.connection import get_async_session
from ragitect.services.database.exceptions import NotFoundError
from ragitect.services.database.exceptions import DuplicateError, NotFoundError
from ragitect.services.database.repositories.document_repo import DocumentRepository
from ragitect.services.database.repositories.workspace_repo import WorkspaceRepository
from ragitect.services.document_processing_service import DocumentProcessingService
from ragitect.services.document_upload_service import DocumentUploadService
from ragitect.services.exceptions import FileSizeExceededError
from ragitect.services.processor.factory import UnsupportedFormatError
from ragitect.services.validators.url_validator import (
InvalidURLSchemeError,
SSRFAttemptError,
URLValidator,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -158,6 +166,130 @@ async def upload_documents(
) from e


@router.post(
"/{workspace_id}/documents/upload-url",
response_model=URLUploadResponse,
status_code=status.HTTP_201_CREATED,
summary="Submit URL for document ingestion",
description="""Submit a URL for document ingestion. Supports web pages, YouTube videos, and PDF URLs.

The URL is validated for security (SSRF prevention) and queued for background processing.
Actual content fetching happens asynchronously (Story 5.5).

**Security Notes:**
- Only HTTP and HTTPS URLs are allowed
- Private IPs and localhost are blocked for security reasons
- Cloud metadata endpoints (169.254.x.x) are blocked
""",
)
async def upload_url(
workspace_id: UUID,
input_data: URLUploadInput,
session: AsyncSession = Depends(get_async_session),
) -> URLUploadResponse:
"""Submit URL for document ingestion

Args:
workspace_id: Target workspace UUID
input_data: URL upload input with source_type and url
session: Database session (injected by FastAPI)

Returns:
URLUploadResponse with document ID and status

Raises:
HTTPException 404: If workspace not found
HTTPException 400: If URL validation fails (invalid scheme or SSRF attempt)
HTTPException 409: If URL already submitted for this workspace
"""
source_url = str(input_data.url)
source_type = input_data.source_type

# Sanitize URL for storage/logging (strip userinfo and fragment)
split = urlsplit(source_url)
hostname = split.hostname or ""
port = f":{split.port}" if split.port else ""
host_for_netloc = hostname
if hostname and ":" in hostname and not hostname.startswith("["):
host_for_netloc = f"[{hostname}]"
sanitized_netloc = f"{host_for_netloc}{port}"
sanitized_url = urlunsplit(
(split.scheme, sanitized_netloc, split.path, split.query, "")
)
safe_log_url = f"{split.scheme}://{hostname}{port}{split.path or ''}"

logger.info(
"URL upload request: workspace=%s, type=%s, url=%s",
workspace_id,
source_type,
safe_log_url,
)

# Validate workspace exists
workspace_repo = WorkspaceRepository(session)
try:
_ = await workspace_repo.get_by_id_or_raise(workspace_id)
except NotFoundError as e:
logger.warning(f"Workspace not found: {workspace_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workspace not found: {workspace_id}",
) from e

# Validate URL security (SSRF prevention)
url_validator = URLValidator()
try:
url_validator.validate_url(sanitized_url)
except InvalidURLSchemeError as e:
logger.warning("Invalid URL scheme blocked: %s", safe_log_url)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
except SSRFAttemptError as e:
logger.warning("SSRF attempt blocked: %s", safe_log_url)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e

# Create document placeholder (status="backlog")
document_repo = DocumentRepository(session)
try:
document = await document_repo.create_from_url(
workspace_id=workspace_id,
source_url=sanitized_url,
source_type=source_type,
)

# Commit transaction
await session.commit()

logger.info(
"URL submitted for ingestion: document_id=%s, url=%s",
document.id,
safe_log_url,
)

# NOTE: Background processing NOT triggered here (Story 5.5)
# Document will remain in "backlog" status until background task picks it up

return URLUploadResponse(
id=str(document.id),
source_type=source_type,
source_url=sanitized_url,
status="backlog",
message="URL submitted for ingestion. Processing will begin shortly.",
)

except DuplicateError as e:
logger.warning("Duplicate URL submission: %s", safe_log_url)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"URL already submitted for this workspace: {sanitized_url}",
) from e


@router.get(
"/documents/{document_id}/status",
response_model=DocumentStatusResponse,
Expand Down
Loading