diff --git a/hindsight-api-slim/hindsight_api/engine/entity_resolver.py b/hindsight-api-slim/hindsight_api/engine/entity_resolver.py index d40e994d6..6c866e973 100644 --- a/hindsight-api-slim/hindsight_api/engine/entity_resolver.py +++ b/hindsight-api-slim/hindsight_api/engine/entity_resolver.py @@ -782,236 +782,6 @@ class _NameGroup: return entity_ids - async def resolve_entity( - self, - bank_id: str, - entity_text: str, - context: str, - nearby_entities: list[dict], - unit_event_date, - ) -> str: - """ - Resolve an entity to a canonical entity ID. - - Args: - bank_id: bank ID (entities are scoped to agents) - entity_text: Entity text ("Alice", "Google", etc.) - context: Context where entity appears - nearby_entities: Other entities in the same unit - unit_event_date: When this unit was created - - Returns: - Entity ID (creates new entity if needed) - """ - async with acquire_with_retry(self.pool) as conn: - # Find candidate entities with similar name - candidates = await conn.fetch( - f""" - SELECT id, canonical_name, metadata, last_seen - FROM {fq_table("entities")} - WHERE bank_id = $1 - AND ( - canonical_name ILIKE $2 - OR canonical_name ILIKE $3 - OR $2 ILIKE canonical_name || '%%' - ) - ORDER BY mention_count DESC - """, - bank_id, - entity_text, - f"%{entity_text}%", - ) - - if not candidates: - # New entity - create it - return await self._create_entity(conn, bank_id, entity_text, unit_event_date) - - # Score candidates based on: - # 1. Name similarity - # 2. Context overlap (TODO: could use embeddings) - # 3. Co-occurring entities - # 4. Temporal proximity - - best_candidate = None - best_score = 0.0 - - nearby_entity_set = {e["text"].lower() for e in nearby_entities if e["text"] != entity_text} - - for row in candidates: - candidate_id = row["id"] - canonical_name = row["canonical_name"] - last_seen = row["last_seen"] - score = 0.0 - - # 1. Name similarity (0-1) - name_similarity = SequenceMatcher(None, entity_text.lower(), canonical_name.lower()).ratio() - score += name_similarity * 0.5 - - # 2. Co-occurring entities (0-0.5) - # Get entities that co-occurred with this candidate before - # Use the materialized co-occurrence cache for fast lookup - co_entity_rows = await conn.fetch( - f""" - SELECT e.canonical_name, ec.cooccurrence_count - FROM {fq_table("entity_cooccurrences")} ec - JOIN {fq_table("entities")} e ON ( - CASE - WHEN ec.entity_id_1 = $1 THEN ec.entity_id_2 - WHEN ec.entity_id_2 = $1 THEN ec.entity_id_1 - END = e.id - ) - WHERE ec.entity_id_1 = $1 OR ec.entity_id_2 = $1 - """, - candidate_id, - ) - co_entities = {r["canonical_name"].lower() for r in co_entity_rows} - - # Check overlap with nearby entities - overlap = len(nearby_entity_set & co_entities) - if nearby_entity_set: - co_entity_score = overlap / len(nearby_entity_set) - score += co_entity_score * 0.3 - - # 3. Temporal proximity (0-0.2) - if last_seen: - # Normalize both to UTC-aware to avoid naive/aware mismatch - # (Oracle returns naive datetimes from fromisoformat) - _evt = unit_event_date if unit_event_date.tzinfo else unit_event_date.replace(tzinfo=UTC) - _seen = last_seen if last_seen.tzinfo else last_seen.replace(tzinfo=UTC) - days_diff = abs((_evt - _seen).total_seconds() / 86400) - if days_diff < 7: # Within a week - temporal_score = max(0, 1.0 - (days_diff / 7)) - score += temporal_score * 0.2 - - if score > best_score: - best_score = score - best_candidate = candidate_id - - # Threshold for considering it the same entity - threshold = 0.6 - - if best_score > threshold: - # Update entity - await conn.execute( - f""" - UPDATE {fq_table("entities")} - SET mention_count = mention_count + 1, - last_seen = $1 - WHERE id = $2 - """, - unit_event_date, - best_candidate, - ) - return best_candidate - else: - # Not confident - create new entity - return await self._create_entity(conn, bank_id, entity_text, unit_event_date) - - async def _create_entity( - self, - conn, - bank_id: str, - entity_text: str, - event_date, - ) -> str: - """ - Create a new entity or get existing one if it already exists. - - Uses INSERT ... ON CONFLICT to handle race conditions where - two concurrent transactions try to create the same entity. - - Args: - conn: Database connection - bank_id: bank ID - entity_text: Entity text - event_date: When first seen - - Returns: - Entity ID - """ - entity_id = await conn.fetchval( - f""" - INSERT INTO {fq_table("entities")} (bank_id, canonical_name, first_seen, last_seen, mention_count) - VALUES ($1, $2, COALESCE($3, now()), COALESCE($4, now()), 1) - ON CONFLICT (bank_id, LOWER(canonical_name)) - DO UPDATE SET - mention_count = {fq_table("entities")}.mention_count + 1, - last_seen = EXCLUDED.last_seen - RETURNING id - """, - bank_id, - entity_text, - event_date, - event_date, - ) - return entity_id - - async def link_unit_to_entity(self, unit_id: str, entity_id: str): - """ - Link a memory unit to an entity. - Also updates co-occurrence cache with other entities in the same unit. - - Args: - unit_id: Memory unit ID - entity_id: Entity ID - """ - async with acquire_with_retry(self.pool) as conn: - # Insert unit-entity link - await conn.execute( - f""" - INSERT INTO {fq_table("unit_entities")} (unit_id, entity_id) - VALUES ($1, $2) - ON CONFLICT DO NOTHING - """, - unit_id, - entity_id, - ) - - # Update co-occurrence cache: find other entities in this unit - rows = await conn.fetch( - f""" - SELECT entity_id - FROM {fq_table("unit_entities")} - WHERE unit_id = $1 AND entity_id != $2 - """, - unit_id, - entity_id, - ) - - other_entities = [row["entity_id"] for row in rows] - - # Update co-occurrences for each pair - for other_entity_id in other_entities: - await self._update_cooccurrence(conn, entity_id, other_entity_id) - - async def _update_cooccurrence(self, conn, entity_id_1: str, entity_id_2: str): - """ - Update the co-occurrence cache for two entities. - - Uses CHECK constraint ordering (entity_id_1 < entity_id_2) to avoid duplicates. - - Args: - conn: Database connection - entity_id_1: First entity ID - entity_id_2: Second entity ID - """ - # Ensure consistent ordering (smaller UUID first) - if entity_id_1 > entity_id_2: - entity_id_1, entity_id_2 = entity_id_2, entity_id_1 - - await conn.execute( - f""" - INSERT INTO {fq_table("entity_cooccurrences")} (entity_id_1, entity_id_2, cooccurrence_count, last_cooccurred) - VALUES ($1, $2, 1, NOW()) - ON CONFLICT (entity_id_1, entity_id_2) - DO UPDATE SET - cooccurrence_count = {fq_table("entity_cooccurrences")}.cooccurrence_count + 1, - last_cooccurred = NOW() - """, - entity_id_1, - entity_id_2, - ) - async def link_units_to_entities_batch( self, unit_entity_pairs: list[tuple[str, str]] | list[tuple[str, str, datetime | None]],