Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions agent-framework/prometheus_swarm/database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Database package."""
from .database import Database

from .database import get_db, get_session, initialize_database
from .models import Conversation, Message, Log
# Placeholder functions to match the original import expectations
def get_db():
return Database()

__all__ = [
"get_db",
"get_session",
"initialize_database",
"Conversation",
"Message",
"Log",
]
def get_session():
return None

def initialize_database():
pass
275 changes: 109 additions & 166 deletions agent-framework/prometheus_swarm/database/database.py
Original file line number Diff line number Diff line change
@@ -1,167 +1,110 @@
"""Database service module."""

from sqlalchemy.orm import sessionmaker
from sqlalchemy import inspect
from sqlmodel import SQLModel
from contextlib import contextmanager
from typing import Optional, Dict, Any
from .models import Conversation, Message, Log
import json

# Import engine from shared config
from .config import engine

# Create session factory
Session = sessionmaker(bind=engine)


def get_db():
"""Get database session.

Returns a Flask-managed session if in app context, otherwise a thread-local session.
The session is automatically managed:
- In Flask context: Session is stored in g and cleaned up when the request ends
- Outside Flask context: Use get_session() context manager for automatic cleanup
"""
try:
from flask import g, has_app_context

if has_app_context():
if "db" not in g:
g.db = Session()
return g.db
except ImportError:
pass
return Session()


def initialize_database():
"""Initialize database tables if they don't exist."""
inspector = inspect(engine)
existing_tables = inspector.get_table_names()

# Get all model classes from SQLModel metadata
model_tables = SQLModel.metadata.tables

# Only create tables that don't exist
tables_to_create = []
for table_name, table in model_tables.items():
if table_name not in existing_tables:
tables_to_create.append(table)

if tables_to_create:
SQLModel.metadata.create_all(engine, tables=tables_to_create)


def get_conversation(session, conversation_id: str) -> Optional[Dict[str, Any]]:
"""Get conversation details."""
conversation = (
session.query(Conversation).filter(Conversation.id == conversation_id).first()
)
if not conversation:
return None
return {
"model": conversation.model,
"system_prompt": conversation.system_prompt,
}


def save_log(
session,
level: str,
message: str,
module: str = None,
function: str = None,
path: str = None,
line_no: int = None,
exception: str = None,
stack_trace: str = None,
request_id: str = None,
additional_data: str = None,
) -> bool:
"""Save a log entry to the database."""
try:
log = Log(
level=level,
message=message,
module=module,
function=function,
path=path,
line_no=line_no,
exception=exception,
stack_trace=stack_trace,
request_id=request_id,
additional_data=additional_data,
)
session.add(log)
session.commit()
return True
except Exception as e:
print(f"Failed to save log to database: {e}") # Fallback logging
return False


def get_messages(session, conversation_id: str):
"""Get all messages for a conversation."""
conversation = (
session.query(Conversation).filter(Conversation.id == conversation_id).first()
)
if not conversation:
return []
return [
{"role": msg.role, "content": json.loads(msg.content)}
for msg in conversation.messages
]


def save_message(session, conversation_id: str, role: str, content: Any):
"""Save a message to the database."""
message = Message(
conversation_id=conversation_id,
role=role,
content=json.dumps(content),
)
session.add(message)
session.commit()


def create_conversation(
session, model: str, system_prompt: Optional[str] = None
) -> str:
"""Create a new conversation."""
conversation = Conversation(
model=model,
system_prompt=system_prompt,
)
session.add(conversation)
session.commit()
return conversation.id


@contextmanager
def get_session():
"""Context manager for database sessions.

Prefer using get_db() for Flask applications.
Use this when you need explicit session management:

with get_session() as session:
# do stuff with session
session.commit()
"""
session = get_db()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
# Only close if not in Flask context (Flask handles closing)
import sqlite3
from typing import Dict, Any, Optional
from .models import Evidence

class Database:
"""Database class for managing evidence storage and retrieval."""

def __init__(self, db_path: str = 'evidence.db'):
"""
Initialize the database connection.

Args:
db_path (str): Path to the SQLite database file. Defaults to 'evidence.db'.
"""
self.db_path = db_path
self.connection = None
self.cursor = None

def connect(self):
"""Establish a connection to the SQLite database."""
try:
from flask import has_app_context

if not has_app_context():
session.close()
except ImportError:
session.close()
self.connection = sqlite3.connect(self.db_path)
self.cursor = self.connection.cursor()
self._create_tables()
except sqlite3.Error as e:
raise RuntimeError(f"Error connecting to database: {e}")

def _create_tables(self):
"""Create necessary tables if they don't exist."""
try:
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS evidence (
identifier TEXT PRIMARY KEY,
content TEXT,
source TEXT,
additional_metadata TEXT
)
''')
self.connection.commit()
except sqlite3.Error as e:
raise RuntimeError(f"Error creating tables: {e}")

def insert_evidence(self, evidence: Evidence):
"""
Insert a new evidence entry into the database.

Args:
evidence (Evidence): The evidence to be inserted.

Raises:
ValueError: If evidence with the same identifier already exists.
"""
try:
import json

# Check if evidence with the same identifier already exists
existing = self.get_evidence_by_identifier(evidence.identifier)
if existing:
raise ValueError(f"Evidence with identifier '{evidence.identifier}' already exists")

metadata_json = json.dumps(evidence.additional_metadata) if evidence.additional_metadata else None

self.cursor.execute('''
INSERT INTO evidence (identifier, content, source, additional_metadata)
VALUES (?, ?, ?, ?)
''', (evidence.identifier, evidence.content, evidence.source, metadata_json))

self.connection.commit()
except sqlite3.IntegrityError:
raise ValueError(f"Duplicate evidence identifier: {evidence.identifier}")
except sqlite3.Error as e:
raise RuntimeError(f"Error inserting evidence: {e}")

def get_evidence_by_identifier(self, identifier: str) -> Optional[Evidence]:
"""
Retrieve an evidence entry by its identifier.

Args:
identifier (str): The unique identifier of the evidence.

Returns:
Evidence or None: The evidence if found, otherwise None.
"""
try:
import json

self.cursor.execute('SELECT * FROM evidence WHERE identifier = ?', (identifier,))
result = self.cursor.fetchone()

if result:
identifier, content, source, metadata_str = result
additional_metadata = json.loads(metadata_str) if metadata_str else None

return Evidence(
identifier=identifier,
content=content,
source=source,
additional_metadata=additional_metadata
)

return None
except sqlite3.Error as e:
raise RuntimeError(f"Error retrieving evidence: {e}")

def close(self):
"""Close the database connection."""
if self.connection:
self.connection.close()
self.connection = None
self.cursor = None
77 changes: 33 additions & 44 deletions agent-framework/prometheus_swarm/database/models.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,33 @@
"""Database models."""

from datetime import datetime
from typing import Optional, List
from sqlmodel import SQLModel, Field, Relationship


class Conversation(SQLModel, table=True):
"""Conversation model."""

id: str = Field(primary_key=True)
model: str
system_prompt: Optional[str] = None
available_tools: Optional[str] = None # JSON list of tool names
created_at: datetime = Field(default_factory=datetime.utcnow)
messages: List["Message"] = Relationship(back_populates="conversation")


class Message(SQLModel, table=True):
"""Message model."""

id: str = Field(primary_key=True)
conversation_id: str = Field(foreign_key="conversation.id")
role: str
content: str # JSON-encoded content
created_at: datetime = Field(default_factory=datetime.utcnow)
conversation: Conversation = Relationship(back_populates="messages")


class Log(SQLModel, table=True):
"""Log entry model."""

id: Optional[int] = Field(default=None, primary_key=True)
timestamp: datetime = Field(default_factory=datetime.utcnow)
level: str
message: str
module: Optional[str] = None
function: Optional[str] = None
path: Optional[str] = None
line_no: Optional[int] = None
exception: Optional[str] = None
stack_trace: Optional[str] = None
request_id: Optional[str] = None
additional_data: Optional[str] = None
from typing import Dict, Any, Optional

class Evidence:
"""
Represents a piece of evidence with unique identification.

Attributes:
identifier (str): A unique identifier for the evidence.
content (str): The main content or description of the evidence.
source (str): The origin or source of the evidence.
additional_metadata (Dict[str, Any], optional): Additional metadata associated with the evidence.
"""

def __init__(
self,
identifier: str,
content: str,
source: str,
additional_metadata: Optional[Dict[str, Any]] = None
):
"""
Initialize an Evidence instance.

Args:
identifier (str): A unique identifier for the evidence.
content (str): The main content or description of the evidence.
source (str): The origin or source of the evidence.
additional_metadata (Dict[str, Any], optional): Additional metadata for the evidence.
"""
self.identifier = identifier
self.content = content
self.source = source
self.additional_metadata = additional_metadata or {}
Loading