Skip to content
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ba4344d
test: add long-term memory test infrastructure
tyaroshko May 25, 2026
fac5d53
feat: add Fact data model for long-term memory
tyaroshko May 25, 2026
958384b
feat: add LongTermMemoryBackend abstract base class
tyaroshko May 25, 2026
add1c89
feat: add in-memory long-term memory backend
tyaroshko May 25, 2026
763abe7
feat: add LongTermMemory facade with remember, recall, forget
tyaroshko May 25, 2026
16354ea
feat: add long-term memory tools and factory
tyaroshko May 25, 2026
1feee37
feat: integrate long-term memory into Agent execution
tyaroshko May 25, 2026
b8252e6
feat: add pgvector-backed long-term memory backend
tyaroshko May 25, 2026
91e8efe
feat: add Qdrant-backed long-term memory backend
tyaroshko May 25, 2026
12013bd
fix: harden long-term memory against bandit findings
tyaroshko May 25, 2026
3e1a62e
refactor: drop module docstrings and dead comments from long-term memory
tyaroshko May 25, 2026
82db48d
docs: normalize long-term memory docstrings
tyaroshko May 25, 2026
3d1593d
refactor: replace string literals with ForgetStatus and MemoryToolKin…
tyaroshko May 25, 2026
5470ef7
refactor: use typed connection fields in long-term memory backends
tyaroshko May 26, 2026
e556184
refactor: align LongTermMemory facade and ABC with dynamiq Memory con…
tyaroshko May 26, 2026
bcc0267
feat: serialize Agent.long_term_memory through workflow YAML round-trip
tyaroshko May 26, 2026
620e1c9
refactor: adopt canonical node lifecycle and richer descriptions in l…
tyaroshko May 26, 2026
58fbd7c
feat: log when long-term memory tools attach to an agent run
tyaroshko May 26, 2026
f61ac31
fix: rename memory backends
tyaroshko May 27, 2026
6bf75ff
chore: update node names for memory backends
tyaroshko May 27, 2026
3659a66
feat: implement update() across long-term memory backends
tyaroshko May 27, 2026
30f8eea
feat: add Letta-style semantic upsert to LongTermMemory.remember
tyaroshko May 27, 2026
14d5822
refactor: drop forget tool and agent-optimize remember/recall outputs
tyaroshko May 27, 2026
4c4ffc0
test: cover semantic upsert, agent-optimized outputs, and removed for…
tyaroshko May 27, 2026
c415579
chore: remove redundant module-level docstrings
tyaroshko May 27, 2026
9112388
fix: exclude long_term_memory from LTM tool to_dict to avoid serializ…
tyaroshko May 27, 2026
537fba4
fix: drop 10k cap in qdrant delete_scope by using count + delete-by-f…
tyaroshko May 27, 2026
a4ff448
fix: serialize LongTermMemoryConfig.tools as strings for yaml round-trip
tyaroshko May 27, 2026
d98ef29
fix: restore Agent.tools even when prep steps before _run_agent raise
tyaroshko May 27, 2026
d6b14f3
fix: remove comments
tyaroshko May 27, 2026
ff9a5c0
fix: qdrant delete_scope deletes all on empty scope
tyaroshko May 27, 2026
b83b5c9
fix: update comments
tyaroshko May 27, 2026
2fe0c56
fix: preserve tools added mid-run when removing LTM tools
tyaroshko May 27, 2026
446a94a
fix: qdrant delete_scope returns exact deleted count via scroll+delet…
tyaroshko May 27, 2026
f8955a6
fix: propagate include_secure_params through LTM backends to connection
tyaroshko May 27, 2026
bbf083a
fix: set is_optimized_for_agents on per-run LTM tools
tyaroshko May 27, 2026
0eade8f
fix: init LTM embedder and serialize concurrent LTM tool windows
tyaroshko May 27, 2026
c7ce37d
fix: acquire LTM lock directly before try block to prevent leak on raise
tyaroshko May 28, 2026
7e36980
fix: skip enum members without a builder in build_long_term_memory_tools
tyaroshko May 28, 2026
46552b0
fix: hold LTM lock for every execute when LTM is configured
tyaroshko May 28, 2026
c5f667c
refactor: use ContextVar overlay for per-call LTM tools instead of lo…
tyaroshko May 28, 2026
611ea39
refactor: move LTM ops onto backend and rename LongTermMemory to Long…
tyaroshko May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dynamiq/connections/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def to_dict(self, for_tracing: bool = False, **kwargs) -> dict:
Returns:
dict: A dictionary representation of the connection instance.
"""
# Swallow `include_secure_params` if a caller forwards it down — the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets shorten the large comments

# connection always serializes its credential fields (subject only to
# `for_tracing`), so the flag has no effect here but must not leak to
# `model_dump`, which raises on unknown kwargs.
kwargs.pop("include_secure_params", None)
if for_tracing:
return {"id": self.id, "type": self.type}
return self.model_dump(**kwargs)
Expand Down
14 changes: 14 additions & 0 deletions dynamiq/memory/long_term/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dynamiq.memory.long_term.base import LongTermMemoryBackend
from dynamiq.memory.long_term.long_term_memory import LongTermMemory, LongTermMemoryConfig
from dynamiq.memory.long_term.schemas import Fact
from dynamiq.memory.long_term.types import ForgetStatus, MemoryToolKind, RememberOutcome

__all__ = [
"Fact",
"ForgetStatus",
"LongTermMemory",
"LongTermMemoryBackend",
"LongTermMemoryConfig",
"MemoryToolKind",
"RememberOutcome",
]
5 changes: 5 additions & 0 deletions dynamiq/memory/long_term/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dynamiq.memory.long_term.backends.in_memory import InMemoryLongTermMemoryBackend
from dynamiq.memory.long_term.backends.pgvector import PostgresLongTermMemoryBackend
from dynamiq.memory.long_term.backends.qdrant import QdrantLongTermMemoryBackend

__all__ = ["InMemoryLongTermMemoryBackend", "PostgresLongTermMemoryBackend", "QdrantLongTermMemoryBackend"]
91 changes: 91 additions & 0 deletions dynamiq/memory/long_term/backends/in_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from datetime import datetime

import numpy as np
from pydantic import PrivateAttr

from dynamiq.memory.long_term.base import LongTermMemoryBackend
from dynamiq.memory.long_term.schemas import Fact


class InMemoryLongTermMemoryBackend(LongTermMemoryBackend):
"""Dict + numpy-cosine backend. Loses data on restart."""

name: str = "in-memory-long-term-memory-backend"

_facts: dict[str, Fact] = PrivateAttr(default_factory=dict)
_vectors: dict[str, list[float]] = PrivateAttr(default_factory=dict)

def insert(self, fact: Fact, embedding: list[float]) -> None:
self._facts[fact.id] = fact
self._vectors[fact.id] = list(embedding)

def get(self, fact_id: str) -> Fact | None:
return self._facts.get(fact_id)

def get_by_hash(self, *, user_id: str, content_hash: str) -> Fact | None:
for fact in self._facts.values():
if fact.user_id == user_id and fact.hash == content_hash:
return fact
return None

def delete(self, fact_id: str) -> None:
self._facts.pop(fact_id, None)
self._vectors.pop(fact_id, None)

def update(
self,
fact_id: str,
*,
content: str,
content_hash: str,
embedding: list[float],
updated_at: datetime,
) -> None:
existing = self._facts.get(fact_id)
if existing is None:
return
self._facts[fact_id] = existing.model_copy(
update={"content": content, "hash": content_hash, "updated_at": updated_at}
)
self._vectors[fact_id] = list(embedding)

def search(
self, *, query_embedding: list[float],
scope: dict[str, str], limit: int,
) -> list[tuple[Fact, float]]:
if not self._facts:
return []

query = np.asarray(query_embedding, dtype=np.float64)
query_norm = np.linalg.norm(query) or 1.0

scored: list[tuple[Fact, float]] = []
for fact_id, fact in self._facts.items():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to do more efiicienly? I think it is better to vectorize it.

if not _matches_scope(fact, scope):
continue
vec = np.asarray(self._vectors[fact_id], dtype=np.float64)
vec_norm = np.linalg.norm(vec) or 1.0
cosine = float(np.dot(query, vec) / (query_norm * vec_norm))
scored.append((fact, cosine))

scored.sort(key=lambda pair: pair[1], reverse=True)
return scored[:limit]

def list_by_scope(
self, scope: dict[str, str], limit: int = 100,
) -> list[Fact]:
matched = [f for f in self._facts.values() if _matches_scope(f, scope)]
return matched[:limit]

def delete_scope(self, scope: dict[str, str]) -> int:
to_delete = [fid for fid, f in self._facts.items() if _matches_scope(f, scope)]
for fid in to_delete:
self.delete(fid)
return len(to_delete)


def _matches_scope(fact: Fact, scope: dict[str, str]) -> bool:
for key, value in scope.items():
if getattr(fact, key, None) != value:
return False
return True
236 changes: 236 additions & 0 deletions dynamiq/memory/long_term/backends/pgvector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from datetime import datetime
from typing import Any

import psycopg
from pgvector.psycopg import register_vector
from psycopg.rows import dict_row
from psycopg.sql import SQL, Composed, Identifier
from psycopg.types.json import Jsonb
from pydantic import ConfigDict, Field, PrivateAttr

from dynamiq.connections import PostgreSQL as PostgreSQLConnection
from dynamiq.memory.long_term.base import LongTermMemoryBackend
from dynamiq.memory.long_term.schemas import Fact

_CREATE_EXTENSION_SQL = SQL("CREATE EXTENSION IF NOT EXISTS vector")

_CREATE_TABLE_TEMPLATE = SQL(
"""
CREATE TABLE IF NOT EXISTS {table} (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
hash TEXT NOT NULL,
user_id TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{{}}'::jsonb,
embedding vector({dim}) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
)
"""
)

_CREATE_USER_ID_INDEX_TEMPLATE = SQL("CREATE INDEX IF NOT EXISTS {idx} ON {table} (user_id)")

_CREATE_USER_HASH_INDEX_TEMPLATE = SQL("CREATE UNIQUE INDEX IF NOT EXISTS {idx} ON {table} (user_id, hash)")


def _scope_where_clause(scope: dict[str, str]) -> tuple[Composed, list]:
"""Build a parameterised WHERE clause from a scope dict.

Keys are interpolated as `Identifier` (safe); values stay as `%s` placeholders
for the driver — never an f-string substitution.
"""
if not scope:
return SQL("TRUE"), []
clauses = [SQL("{key} = %s").format(key=Identifier(key)) for key in scope.keys()]
return SQL(" AND ").join(clauses), list(scope.values())


def _row_to_fact(row) -> Fact:
return Fact(
id=row["id"],
content=row["content"],
hash=row["hash"],
user_id=row["user_id"],
metadata=row["metadata"] or {},
created_at=row["created_at"],
updated_at=row["updated_at"],
)


_FACT_COLUMNS = SQL("id, content, hash, user_id, metadata, created_at, updated_at")


class PostgresLongTermMemoryBackend(LongTermMemoryBackend):
"""Long-term memory backend backed by Postgres + pgvector."""

model_config = ConfigDict(arbitrary_types_allowed=True)

name: str = "postgres-long-term-memory-backend"
connection: PostgreSQLConnection = Field(default_factory=PostgreSQLConnection)
table_name: str = "user_facts"
dimension: int = 1536

_conn: psycopg.Connection | None = PrivateAttr(default=None)

@property
def to_dict_exclude_params(self) -> dict[str, bool]:
return super().to_dict_exclude_params | {"_conn": True, "connection": True}

def to_dict(self, include_secure_params: bool = False, for_tracing: bool = False, **kwargs) -> dict[str, Any]:
exclude = kwargs.pop("exclude", self.to_dict_exclude_params.copy())
data = self.model_dump(exclude=exclude, **kwargs)
data["connection"] = self.connection.to_dict(
for_tracing=for_tracing, include_secure_params=include_secure_params, **kwargs
)
return data

def model_post_init(self, __context) -> None:
self._conn = self.connection.connect()
self._conn.autocommit = True
with self._conn.cursor() as cur:
cur.execute(_CREATE_EXTENSION_SQL)
register_vector(self._conn)

@property
def _table(self) -> Identifier:
"""Return the table name wrapped as a safe SQL identifier."""
return Identifier(self.table_name)

def ensure_table(self) -> None:
"""Create the facts table and indexes if absent. Safe to call repeatedly."""
with self._conn.cursor() as cur:
cur.execute(_CREATE_EXTENSION_SQL)
cur.execute(_CREATE_TABLE_TEMPLATE.format(table=self._table, dim=SQL(str(self.dimension))))
cur.execute(
_CREATE_USER_ID_INDEX_TEMPLATE.format(
idx=Identifier(f"{self.table_name}_user_id_idx"),
table=self._table,
)
)
cur.execute(
_CREATE_USER_HASH_INDEX_TEMPLATE.format(
idx=Identifier(f"{self.table_name}_user_hash_uidx"),
table=self._table,
)
)

def recreate_table(self) -> None:
"""Drop and re-create the facts table. Test-only helper."""
with self._conn.cursor() as cur:
cur.execute(SQL("DROP TABLE IF EXISTS {table}").format(table=self._table))
self.ensure_table()

def drop_table(self) -> None:
"""Drop the facts table if it exists. Test-only helper."""
with self._conn.cursor() as cur:
cur.execute(SQL("DROP TABLE IF EXISTS {table}").format(table=self._table))

def insert(self, fact: Fact, embedding: list[float]) -> None:
with self._conn.cursor() as cur:
cur.execute(
SQL("INSERT INTO {table} ({cols}, embedding) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)").format(
table=self._table, cols=_FACT_COLUMNS
),
(
fact.id,
fact.content,
fact.hash,
fact.user_id,
Jsonb(fact.metadata),
fact.created_at,
fact.updated_at,
embedding,
),
)

def get(self, fact_id: str) -> Fact | None:
with self._conn.cursor(row_factory=dict_row) as cur:
cur.execute(
SQL("SELECT {cols} FROM {table} WHERE id = %s").format(
cols=_FACT_COLUMNS,
table=self._table,
),
(fact_id,),
)
row = cur.fetchone()
return _row_to_fact(row) if row else None

def get_by_hash(self, *, user_id: str, content_hash: str) -> Fact | None:
with self._conn.cursor(row_factory=dict_row) as cur:
cur.execute(
SQL("SELECT {cols} FROM {table} WHERE user_id = %s AND hash = %s").format(
cols=_FACT_COLUMNS, table=self._table
),
(user_id, content_hash),
)
row = cur.fetchone()
return _row_to_fact(row) if row else None

def delete(self, fact_id: str) -> None:
with self._conn.cursor() as cur:
cur.execute(
SQL("DELETE FROM {table} WHERE id = %s").format(table=self._table),
(fact_id,),
)

def update(
self,
fact_id: str,
*,
content: str,
content_hash: str,
embedding: list[float],
updated_at: datetime,
) -> None:
with self._conn.cursor() as cur:
cur.execute(
SQL(
"UPDATE {table} SET content = %s, hash = %s, " "embedding = %s, updated_at = %s WHERE id = %s"
).format(table=self._table),
(content, content_hash, embedding, updated_at, fact_id),
)

def search(
self,
*,
query_embedding: list[float],
scope: dict[str, str],
limit: int,
) -> list[tuple[Fact, float]]:
where, params = _scope_where_clause(scope)
with self._conn.cursor(row_factory=dict_row) as cur:
cur.execute(
SQL(
"SELECT {cols}, 1 - (embedding <=> %s::vector) AS score "
"FROM {table} WHERE {where} "
"ORDER BY embedding <=> %s::vector LIMIT %s"
).format(cols=_FACT_COLUMNS, table=self._table, where=where),
[query_embedding] + params + [query_embedding, limit],
)
rows = cur.fetchall()
return [(_row_to_fact(row), float(row["score"])) for row in rows]

def list_by_scope(self, scope: dict[str, str], limit: int = 100) -> list[Fact]:
where, params = _scope_where_clause(scope)
with self._conn.cursor(row_factory=dict_row) as cur:
cur.execute(
SQL("SELECT {cols} FROM {table} WHERE {where} " "ORDER BY created_at DESC LIMIT %s").format(
cols=_FACT_COLUMNS, table=self._table, where=where
),
params + [limit],
)
rows = cur.fetchall()
return [_row_to_fact(row) for row in rows]

def delete_scope(self, scope: dict[str, str]) -> int:
where, params = _scope_where_clause(scope)
with self._conn.cursor() as cur:
cur.execute(
SQL("DELETE FROM {table} WHERE {where}").format(
table=self._table,
where=where,
),
params,
)
return cur.rowcount
Loading
Loading