Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions aperag/db/repositories/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,25 @@ async def _query(session):

return await self._execute_query(_query)

async def query_bots(self, users: List[str]):
async def query_bots(self, users: List[str], offset: int = 0, limit: int = None):
async def _query(session):
stmt = (
select(Bot).where(Bot.user.in_(users), Bot.status != BotStatus.DELETED).order_by(desc(Bot.gmt_created))
)
if offset > 0:
stmt = stmt.offset(offset)
if limit is not None:
stmt = stmt.limit(limit)
result = await session.execute(stmt)
return result.scalars().all()

return await self._execute_query(_query)

async def query_bots_count(self, user: str):
async def query_bots_count(self, users: List[str]):
async def _query(session):
from sqlalchemy import func

stmt = select(func.count()).select_from(Bot).where(Bot.user == user, Bot.status != BotStatus.DELETED)
stmt = select(func.count()).select_from(Bot).where(Bot.user.in_(users), Bot.status != BotStatus.DELETED)
return await session.scalar(stmt)

return await self._execute_query(_query)
Expand Down
22 changes: 21 additions & 1 deletion aperag/schema/view_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
from __future__ import annotations

from datetime import datetime
from typing import Any, Literal, Optional, Union
from typing import Any, Generic, List, Literal, Optional, TypeVar, Union

from pydantic import BaseModel, ConfigDict, EmailStr, Field, RootModel, confloat, conint

T = TypeVar("T")


class ModelSpec(BaseModel):
model: Optional[str] = Field(
Expand Down Expand Up @@ -499,6 +501,24 @@ class PaginatedResponse(BaseModel):
)


class OffsetPaginatedResponse(BaseModel, Generic[T]):
"""
Offset-based paginated response following the proposed API structure.

This provides the exact structure requested in the issue:
{
"total": 1250,
"limit": 25,
"offset": 100,
"data": [...]
}
"""
total: conint(ge=0) = Field(..., description='Total number of items available', examples=[1250])
limit: conint(ge=1) = Field(..., description='Limit that was used for this request', examples=[25])
offset: conint(ge=0) = Field(..., description='Offset that was used for this request', examples=[100])
data: List[T] = Field(..., description='Array of items for the current page')


class ChatList(PaginatedResponse):
"""
A list of chats with pagination
Expand Down
18 changes: 18 additions & 0 deletions aperag/service/api_key_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ async def list_api_keys(self, user: str) -> ApiKeyList:
items.append(self.to_api_key_model(token))
return ApiKeyList(items=items)

async def list_api_keys_offset(self, user: str, offset: int = 0, limit: int = 50):
"""List API keys with offset-based pagination"""
from aperag.utils.offset_pagination import OffsetPaginationHelper

# Get total count
all_tokens = await self.db_ops.query_api_keys(user, is_system=False)
total = len(all_tokens)

# Apply pagination
paginated_tokens = all_tokens[offset:offset + limit] if offset < total else []

# Convert to API models
items = []
for token in paginated_tokens:
items.append(self.to_api_key_model(token))

return OffsetPaginationHelper.build_response(items, total, offset, limit)

async def create_api_key(self, user: str, api_key_create: ApiKeyCreate) -> ApiKeyModel:
"""Create a new API key"""
# For single operations, use DatabaseOps directly
Expand Down
115 changes: 115 additions & 0 deletions aperag/service/audit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,121 @@ async def _list_audit_logs(session):

return PaginationHelper.build_response(items=processed_logs, total=total, page=page, page_size=page_size)

async def list_audit_logs_offset(
self,
offset: int = 0,
limit: int = 50,
sort_by: str = None,
sort_order: str = "desc",
search: str = None,
user_id: Optional[str] = None,
resource_type: Optional[AuditResource] = None,
api_name: Optional[str] = None,
http_method: Optional[str] = None,
status_code: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
):
"""List audit logs with offset-based pagination"""
from aperag.utils.offset_pagination import OffsetPaginationHelper

# Define sort field mapping
sort_mapping = {
"created": AuditLog.gmt_created,
"start_time": AuditLog.start_time,
"end_time": AuditLog.end_time,
"duration": AuditLog.start_time, # Use start_time as proxy for duration sorting
"user_id": AuditLog.user_id,
"api_name": AuditLog.api_name,
"status_code": AuditLog.status_code,
}

# Define search fields mapping
search_fields = {"api_name": AuditLog.api_name, "path": AuditLog.path}

async def _list_audit_logs(session):
from sqlalchemy import func

# Build base query
stmt = select(AuditLog)

# Add filters
conditions = []
if user_id:
conditions.append(AuditLog.user_id == user_id)
if resource_type:
conditions.append(AuditLog.resource_type == resource_type)
if api_name:
conditions.append(AuditLog.api_name.like(f"%{api_name}%"))
if http_method:
conditions.append(AuditLog.http_method == http_method)
if status_code:
conditions.append(AuditLog.status_code == status_code)
if start_date:
conditions.append(AuditLog.gmt_created >= start_date)
if end_date:
conditions.append(AuditLog.gmt_created <= end_date)

if conditions:
stmt = stmt.where(and_(*conditions))

# Get total count
count_query = select(func.count()).select_from(stmt.subquery())
total = await session.scalar(count_query) or 0

# Apply sorting
if sort_by and sort_by in sort_mapping:
sort_field = sort_mapping[sort_by]
if sort_order == "asc":
stmt = stmt.order_by(sort_field)
else:
stmt = stmt.order_by(desc(sort_field))
else:
stmt = stmt.order_by(desc(AuditLog.gmt_created))

# Apply offset and limit
stmt = stmt.offset(offset).limit(limit)

# Execute query
result = await session.execute(stmt)
audit_logs = result.scalars().all()

return audit_logs, total

# Execute query with proper session management
audit_logs = None
total = 0
async for session in get_async_session():
audit_logs, total = await _list_audit_logs(session)
break # Only process one session

# Post-process audit logs outside of session to avoid long session occupation
processed_logs = []
for log in audit_logs:
if log.resource_type and log.path:
# Convert string to enum if needed
resource_type_enum = log.resource_type
if isinstance(log.resource_type, str):
try:
resource_type_enum = AuditResource(log.resource_type)
except ValueError:
resource_type_enum = None

if resource_type_enum:
log.resource_id = self.extract_resource_id_from_path(log.path, resource_type_enum)
else:
log.resource_id = None

# Calculate duration if both times are available
if log.start_time and log.end_time:
log.duration_ms = log.end_time - log.start_time
else:
log.duration_ms = None

processed_logs.append(log)

return OffsetPaginationHelper.build_response(processed_logs, total, offset, limit)


# Global audit service instance
audit_service = AuditService()
16 changes: 13 additions & 3 deletions aperag/service/bot_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,19 @@ async def _create_bot_atomically(session):

return await self.build_bot_response(bot)

async def list_bots(self, user: str) -> view_models.BotList:
bots = await self.db_ops.query_bots([user])
return BotList(items=[await self.build_bot_response(bot) for bot in bots])
async def list_bots(self, user: str, offset: int = 0, limit: int = 50) -> view_models.OffsetPaginatedResponse[view_models.Bot]:
"""List bots with offset-based pagination"""
from aperag.utils.offset_pagination import OffsetPaginationHelper

# Get total count
total = await self.db_ops.query_bots_count([user])

# Get paginated results
bots = await self.db_ops.query_bots([user], offset=offset, limit=limit)

# Build response
bot_responses = [await self.build_bot_response(bot) for bot in bots]
return OffsetPaginationHelper.build_response(bot_responses, total, offset, limit)

async def get_bot(self, user: str, bot_id: str) -> view_models.Bot:
bot = await self.db_ops.query_bot(user, bot_id)
Expand Down
49 changes: 49 additions & 0 deletions aperag/service/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,55 @@ async def _execute_paginated_query(session):

return await self.db_ops._execute_query(_execute_paginated_query)

async def list_chats_offset(
self,
user: str,
bot_id: str,
offset: int = 0,
limit: int = 50,
) -> view_models.OffsetPaginatedResponse[view_models.Chat]:
"""List chats with offset-based pagination."""
from aperag.utils.offset_pagination import OffsetPaginationHelper

# Define sort field mapping
sort_mapping = {
"created": db_models.Chat.gmt_created,
}

async def _execute_paginated_query(session):
from sqlalchemy import and_, desc, select, func

# Build base query
query = select(db_models.Chat).where(
and_(
db_models.Chat.user == user,
db_models.Chat.bot_id == bot_id,
db_models.Chat.status != db_models.ChatStatus.DELETED,
)
)

# Get total count
count_query = select(func.count()).select_from(query.subquery())
total = await session.scalar(count_query) or 0

# Apply sorting and pagination
query = query.order_by(desc(db_models.Chat.gmt_created))
query = query.offset(offset).limit(limit)

# Execute query
result = await session.execute(query)
chats = result.scalars().all()

# Build chat responses
chat_responses = []
for chat in chats:
chat_responses.append(self.build_chat_response(chat))

return chat_responses, total

chats, total = await self.db_ops._execute_query(_execute_paginated_query)
return OffsetPaginationHelper.build_response(chats, total, offset, limit)

async def get_chat(self, user: str, bot_id: str, chat_id: str) -> view_models.ChatDetails:
# Import here to avoid circular imports
from aperag.utils.history import query_chat_messages
Expand Down
84 changes: 84 additions & 0 deletions aperag/service/collection_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,90 @@ async def list_collections_view(
items=paginated_items, pageResult=view_models.PageResult(total=len(items), page=page, page_size=page_size)
)

async def list_collections_view_offset(
self, user_id: str, include_subscribed: bool = True, offset: int = 0, limit: int = 50
) -> view_models.OffsetPaginatedResponse[view_models.CollectionView]:
"""
Get user's collection list with offset-based pagination

Args:
user_id: User ID
include_subscribed: Whether to include subscribed collections, default True
offset: Number of items to skip from the beginning
limit: Maximum number of items to return
"""
from aperag.utils.offset_pagination import OffsetPaginationHelper

items = []

# 1. Get user's owned collections with marketplace info
owned_collections_data = await self.db_ops.query_collections_with_marketplace_info(user_id)

for row in owned_collections_data:
is_published = row.marketplace_status == "PUBLISHED"
items.append(
view_models.CollectionView(
id=row.id,
title=row.title,
description=row.description,
type=row.type,
status=row.status,
created=row.gmt_created,
updated=row.gmt_updated,
is_published=is_published,
published_at=row.published_at if is_published else None,
owner_user_id=row.user,
owner_username=row.owner_username,
subscription_id=None, # Own collection, subscription_id is None
subscribed_at=None,
)
)

# 2. Get subscribed collections if needed (optimized - no N+1 queries)
if include_subscribed:
try:
# Get subscribed collections data with all needed fields in one query
subscribed_collections_data, _ = await self.db_ops.list_user_subscribed_collections(
user_id,
page=1,
page_size=1000, # Get all subscriptions for now
)

for data in subscribed_collections_data:
is_published = data["marketplace_status"] == "PUBLISHED"
items.append(
view_models.CollectionView(
id=data["id"],
title=data["title"],
description=data["description"],
type=data["type"],
status=data["status"],
created=data["gmt_created"],
updated=data["gmt_updated"],
is_published=is_published,
published_at=data["published_at"] if is_published else None,
owner_user_id=data["owner_user_id"],
owner_username=data["owner_username"],
subscription_id=data["subscription_id"],
subscribed_at=data["gmt_subscribed"],
)
)
except Exception as e:
# If getting subscriptions fails, log and continue with owned collections
import logging

logger = logging.getLogger(__name__)
logger.warning(f"Failed to get subscribed collections for user {user_id}: {e}")

# 3. Sort by update time
items.sort(key=lambda x: x.updated or x.created, reverse=True)

# 4. Apply offset-based pagination
total = len(items)
paginated_items = items[offset:offset + limit] if offset < total else []

return OffsetPaginationHelper.build_response(paginated_items, total, offset, limit)

async def get_collection(self, user: str, collection_id: str) -> view_models.Collection:
from aperag.exceptions import CollectionNotFoundException

Expand Down
Loading
Loading