Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions hub-server/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Union, Generator
from typing import Optional, Dict, Union
from config import (
SQLITE_TIMEOUT, POWER_FACTOR, MAINS_VOLTAGE, ENERGY_MONTHLY_RESET, SQLITE_RETRIES, SQLITE_RETRY_DELAY
)
Expand Down Expand Up @@ -38,9 +38,11 @@ def __init__(self, db_path: Union[str, Path]):
logging.info(f"Database initialized at path: {self.db_path}")

def _connect(self):
if self._conn is None:
if not self.db_path.parent.exists():
raise RuntimeError("Database directory missing")
if self._conn is not None:
return

if not self.db_path.parent.exists():
raise RuntimeError("Database directory missing")

self._conn = sqlite3.connect(
self.db_path,
Expand All @@ -57,27 +59,29 @@ def _get_connection(self):


@contextmanager
def __get_connection_cm(self) -> Generator[sqlite3.Connection, None, None]:
def __get_connection_cm(self):
"""
Context manager providing a thread-safe single connection with auto-reconnect.
"""
for attempt in range(1, SQLITE_RETRIES + 1):
try:
self._connect()
with self._conn_lock:
self._connect()
yield self._conn
return
except (sqlite3.OperationalError, sqlite3.DatabaseError) as e:
logging.warning(f"DB connection error (attempt {attempt}/{SQLITE_RETRIES}): {e}")
if self._conn:
try:
self._conn.close()
except Exception:
pass
self._conn = None
with self._conn_lock:
if self._conn:
try:
self._conn.close()
except Exception:
pass
self._conn = None
if attempt < SQLITE_RETRIES:
time.sleep(SQLITE_RETRY_DELAY)


raise RuntimeError("Could not acquire DB connection after retries")


Expand All @@ -98,6 +102,7 @@ def setup(self) -> None:
cursor = conn.cursor()

# Enable WAL + busy timeout
self._conn.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA busy_timeout = 5000;")

Expand Down Expand Up @@ -293,7 +298,7 @@ def truncate_old_data(self, months: int) -> int:
conn.commit()

# Reclaim space
cursor.execute("VACUUM")
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")

if deleted_count > 0:
logging.info(f"Truncated {deleted_count} old records (older than {cutoff_date.strftime('%Y-%m-%d')})")
Expand Down