@@ -782,236 +782,6 @@ class _NameGroup:
782782
783783 return entity_ids
784784
785- async def resolve_entity (
786- self ,
787- bank_id : str ,
788- entity_text : str ,
789- context : str ,
790- nearby_entities : list [dict ],
791- unit_event_date ,
792- ) -> str :
793- """
794- Resolve an entity to a canonical entity ID.
795-
796- Args:
797- bank_id: bank ID (entities are scoped to agents)
798- entity_text: Entity text ("Alice", "Google", etc.)
799- context: Context where entity appears
800- nearby_entities: Other entities in the same unit
801- unit_event_date: When this unit was created
802-
803- Returns:
804- Entity ID (creates new entity if needed)
805- """
806- async with acquire_with_retry (self .pool ) as conn :
807- # Find candidate entities with similar name
808- candidates = await conn .fetch (
809- f"""
810- SELECT id, canonical_name, metadata, last_seen
811- FROM { fq_table ("entities" )}
812- WHERE bank_id = $1
813- AND (
814- canonical_name ILIKE $2
815- OR canonical_name ILIKE $3
816- OR $2 ILIKE canonical_name || '%%'
817- )
818- ORDER BY mention_count DESC
819- """ ,
820- bank_id ,
821- entity_text ,
822- f"%{ entity_text } %" ,
823- )
824-
825- if not candidates :
826- # New entity - create it
827- return await self ._create_entity (conn , bank_id , entity_text , unit_event_date )
828-
829- # Score candidates based on:
830- # 1. Name similarity
831- # 2. Context overlap (TODO: could use embeddings)
832- # 3. Co-occurring entities
833- # 4. Temporal proximity
834-
835- best_candidate = None
836- best_score = 0.0
837-
838- nearby_entity_set = {e ["text" ].lower () for e in nearby_entities if e ["text" ] != entity_text }
839-
840- for row in candidates :
841- candidate_id = row ["id" ]
842- canonical_name = row ["canonical_name" ]
843- last_seen = row ["last_seen" ]
844- score = 0.0
845-
846- # 1. Name similarity (0-1)
847- name_similarity = SequenceMatcher (None , entity_text .lower (), canonical_name .lower ()).ratio ()
848- score += name_similarity * 0.5
849-
850- # 2. Co-occurring entities (0-0.5)
851- # Get entities that co-occurred with this candidate before
852- # Use the materialized co-occurrence cache for fast lookup
853- co_entity_rows = await conn .fetch (
854- f"""
855- SELECT e.canonical_name, ec.cooccurrence_count
856- FROM { fq_table ("entity_cooccurrences" )} ec
857- JOIN { fq_table ("entities" )} e ON (
858- CASE
859- WHEN ec.entity_id_1 = $1 THEN ec.entity_id_2
860- WHEN ec.entity_id_2 = $1 THEN ec.entity_id_1
861- END = e.id
862- )
863- WHERE ec.entity_id_1 = $1 OR ec.entity_id_2 = $1
864- """ ,
865- candidate_id ,
866- )
867- co_entities = {r ["canonical_name" ].lower () for r in co_entity_rows }
868-
869- # Check overlap with nearby entities
870- overlap = len (nearby_entity_set & co_entities )
871- if nearby_entity_set :
872- co_entity_score = overlap / len (nearby_entity_set )
873- score += co_entity_score * 0.3
874-
875- # 3. Temporal proximity (0-0.2)
876- if last_seen :
877- # Normalize both to UTC-aware to avoid naive/aware mismatch
878- # (Oracle returns naive datetimes from fromisoformat)
879- _evt = unit_event_date if unit_event_date .tzinfo else unit_event_date .replace (tzinfo = UTC )
880- _seen = last_seen if last_seen .tzinfo else last_seen .replace (tzinfo = UTC )
881- days_diff = abs ((_evt - _seen ).total_seconds () / 86400 )
882- if days_diff < 7 : # Within a week
883- temporal_score = max (0 , 1.0 - (days_diff / 7 ))
884- score += temporal_score * 0.2
885-
886- if score > best_score :
887- best_score = score
888- best_candidate = candidate_id
889-
890- # Threshold for considering it the same entity
891- threshold = 0.6
892-
893- if best_score > threshold :
894- # Update entity
895- await conn .execute (
896- f"""
897- UPDATE { fq_table ("entities" )}
898- SET mention_count = mention_count + 1,
899- last_seen = $1
900- WHERE id = $2
901- """ ,
902- unit_event_date ,
903- best_candidate ,
904- )
905- return best_candidate
906- else :
907- # Not confident - create new entity
908- return await self ._create_entity (conn , bank_id , entity_text , unit_event_date )
909-
910- async def _create_entity (
911- self ,
912- conn ,
913- bank_id : str ,
914- entity_text : str ,
915- event_date ,
916- ) -> str :
917- """
918- Create a new entity or get existing one if it already exists.
919-
920- Uses INSERT ... ON CONFLICT to handle race conditions where
921- two concurrent transactions try to create the same entity.
922-
923- Args:
924- conn: Database connection
925- bank_id: bank ID
926- entity_text: Entity text
927- event_date: When first seen
928-
929- Returns:
930- Entity ID
931- """
932- entity_id = await conn .fetchval (
933- f"""
934- INSERT INTO { fq_table ("entities" )} (bank_id, canonical_name, first_seen, last_seen, mention_count)
935- VALUES ($1, $2, COALESCE($3, now()), COALESCE($4, now()), 1)
936- ON CONFLICT (bank_id, LOWER(canonical_name))
937- DO UPDATE SET
938- mention_count = { fq_table ("entities" )} .mention_count + 1,
939- last_seen = EXCLUDED.last_seen
940- RETURNING id
941- """ ,
942- bank_id ,
943- entity_text ,
944- event_date ,
945- event_date ,
946- )
947- return entity_id
948-
949- async def link_unit_to_entity (self , unit_id : str , entity_id : str ):
950- """
951- Link a memory unit to an entity.
952- Also updates co-occurrence cache with other entities in the same unit.
953-
954- Args:
955- unit_id: Memory unit ID
956- entity_id: Entity ID
957- """
958- async with acquire_with_retry (self .pool ) as conn :
959- # Insert unit-entity link
960- await conn .execute (
961- f"""
962- INSERT INTO { fq_table ("unit_entities" )} (unit_id, entity_id)
963- VALUES ($1, $2)
964- ON CONFLICT DO NOTHING
965- """ ,
966- unit_id ,
967- entity_id ,
968- )
969-
970- # Update co-occurrence cache: find other entities in this unit
971- rows = await conn .fetch (
972- f"""
973- SELECT entity_id
974- FROM { fq_table ("unit_entities" )}
975- WHERE unit_id = $1 AND entity_id != $2
976- """ ,
977- unit_id ,
978- entity_id ,
979- )
980-
981- other_entities = [row ["entity_id" ] for row in rows ]
982-
983- # Update co-occurrences for each pair
984- for other_entity_id in other_entities :
985- await self ._update_cooccurrence (conn , entity_id , other_entity_id )
986-
987- async def _update_cooccurrence (self , conn , entity_id_1 : str , entity_id_2 : str ):
988- """
989- Update the co-occurrence cache for two entities.
990-
991- Uses CHECK constraint ordering (entity_id_1 < entity_id_2) to avoid duplicates.
992-
993- Args:
994- conn: Database connection
995- entity_id_1: First entity ID
996- entity_id_2: Second entity ID
997- """
998- # Ensure consistent ordering (smaller UUID first)
999- if entity_id_1 > entity_id_2 :
1000- entity_id_1 , entity_id_2 = entity_id_2 , entity_id_1
1001-
1002- await conn .execute (
1003- f"""
1004- INSERT INTO { fq_table ("entity_cooccurrences" )} (entity_id_1, entity_id_2, cooccurrence_count, last_cooccurred)
1005- VALUES ($1, $2, 1, NOW())
1006- ON CONFLICT (entity_id_1, entity_id_2)
1007- DO UPDATE SET
1008- cooccurrence_count = { fq_table ("entity_cooccurrences" )} .cooccurrence_count + 1,
1009- last_cooccurred = NOW()
1010- """ ,
1011- entity_id_1 ,
1012- entity_id_2 ,
1013- )
1014-
1015785 async def link_units_to_entities_batch (
1016786 self ,
1017787 unit_entity_pairs : list [tuple [str , str ]] | list [tuple [str , str , datetime | None ]],
0 commit comments