From 9b7b8e720c9f62363c671122921c8007e2c7ee4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 18 Mar 2026 15:49:01 +0800 Subject: [PATCH 1/6] fix: optimize search_by_embedding --- src/memos/graph_dbs/polardb.py | 22 +++++++++---------- .../mem_scheduler/schemas/general_schemas.py | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 6db31990d..62b006df1 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -180,10 +180,10 @@ def __init__(self, config: PolarDBGraphDBConfig): ) self._semaphore = threading.BoundedSemaphore(maxconn) - if self._warm_up_on_startup_by_full: - self._warm_up_search_connections_by_full() - if self._warm_up_on_startup_by_all: - self._warm_up_connections_by_all() + # if self._warm_up_on_startup_by_full: + # self._warm_up_search_connections_by_full() + # if self._warm_up_on_startup_by_all: + # self._warm_up_connections_by_all() """ # Handle auto_create @@ -1840,9 +1840,8 @@ def search_by_embedding( **kwargs, ) -> list[dict]: logger.info( - "search_by_embedding user_name:%s,filter: %s, knowledgebase_ids: %s,scope:%s,status:%s,search_filter:%s,filter:%s,knowledgebase_ids:%s,return_fields:%s", + "search_by_embedding by user_name:%s,knowledgebase_ids: %s,scope:%s,status:%s,search_filter:%s,filter:%s,knowledgebase_ids:%s,return_fields:%s", user_name, - filter, knowledgebase_ids, scope, status, @@ -1895,20 +1894,21 @@ def search_by_embedding( where_clause = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" query = f""" + set hnsw.ef_search = 100;set hnsw.iterative_scan = relaxed_order; WITH t AS ( SELECT id, properties, timeline, ag_catalog.agtype_access_operator(properties, '"id"'::agtype) AS old_id, - (1 - (embedding <=> %s::vector(1024))) AS scope + (embedding <=> %s::vector(1024)) AS scope_distance FROM "{self.db_name}_graph"."Memory" {where_clause} - ORDER BY scope DESC + ORDER BY scope_distance ASC LIMIT {top_k} ) - SELECT * + SELECT *,(1 - scope_distance) AS scope FROM t - WHERE scope > 0.1; + WHERE scope_distance < 0.9; """ vector_str = convert_to_vector(vector) query = query.replace("%s::vector(1024)", f"'{vector_str}'::vector(1024)") @@ -1953,7 +1953,7 @@ def search_by_embedding( output.append(item) elapsed_time = (time.perf_counter() - start_time) * 1000.0 logger.info( - "search_by_embedding query embedding completed time took %.1f ms", elapsed_time + "search_by_embedding query by embedding completed time took %.1f ms", elapsed_time ) return output[:top_k] diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index 06910ba17..cd44cd171 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -20,7 +20,7 @@ DEFAULT_DISPATCHER_MONITOR_CHECK_INTERVAL = 300 DEFAULT_DISPATCHER_MONITOR_MAX_FAILURES = 2 DEFAULT_STUCK_THREAD_TOLERANCE = 10 -DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE = -1 +DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE = 200 DEFAULT_TOP_K = 5 DEFAULT_CONTEXT_WINDOW_SIZE = 5 DEFAULT_USE_REDIS_QUEUE = os.getenv("MEMSCHEDULER_USE_REDIS_QUEUE", "False").lower() == "true" From 83972d08b9716f5c90a2eddf2c108f8d1aee021a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 18 Mar 2026 15:49:41 +0800 Subject: [PATCH 2/6] fix: optimize search_by_embedding --- src/memos/graph_dbs/polardb.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 62b006df1..0fd45f526 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -180,10 +180,10 @@ def __init__(self, config: PolarDBGraphDBConfig): ) self._semaphore = threading.BoundedSemaphore(maxconn) - # if self._warm_up_on_startup_by_full: - # self._warm_up_search_connections_by_full() - # if self._warm_up_on_startup_by_all: - # self._warm_up_connections_by_all() + if self._warm_up_on_startup_by_full: + self._warm_up_search_connections_by_full() + if self._warm_up_on_startup_by_all: + self._warm_up_connections_by_all() """ # Handle auto_create From c2c23497798bd1355446da0286ffe87c1ef41abe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 18 Mar 2026 17:37:09 +0800 Subject: [PATCH 3/6] feat:optimize get_grouped_counts --- src/memos/graph_dbs/polardb.py | 79 ++++++++++++++++------------------ 1 file changed, 38 insertions(+), 41 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 0fd45f526..7246b8836 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -432,14 +432,13 @@ def node_not_exist(self, scope: str, user_name: str | None = None) -> int: def remove_oldest_memory( self, memory_type: str, keep_latest: int, user_name: str | None = None ) -> None: - """ - Remove all WorkingMemory nodes except the latest `keep_latest` entries. - - Args: - memory_type (str): Memory type (e.g., 'WorkingMemory', 'LongTermMemory'). - keep_latest (int): Number of latest WorkingMemory entries to keep. - user_name (str, optional): User name for filtering in non-multi-db mode - """ + start_time = time.perf_counter() + logger.info( + "remove_oldest_memory by memory_type:%s,keep_latest: %s,user_name:%s", + memory_type, + keep_latest, + user_name + ) user_name = user_name if user_name else self._get_config_value("user_name") # Use actual OFFSET logic, consistent with nebular.py @@ -456,6 +455,7 @@ def remove_oldest_memory( self.format_param_value(user_name), keep_latest, ] + logger.info(f"remove_oldest_memory by select_query:{select_query},select_params:{select_params}") try: with self._get_connection() as conn, conn.cursor() as cursor: # Execute query to get IDs to delete @@ -482,6 +482,8 @@ def remove_oldest_memory( f"keeping {keep_latest} latest for user {user_name}, " f"removed ids: {ids_to_delete}" ) + elapsed = (time.perf_counter() - start_time) * 1000.0 + logger.info("remove_oldest_memory internal took %.1f ms", elapsed) except Exception as e: logger.error(f"[remove_oldest_memory] Failed: {e}", exc_info=True) raise @@ -2165,25 +2167,19 @@ def get_grouped_counts( params: dict[str, Any] | None = None, user_name: str | None = None, ) -> list[dict[str, Any]]: - """ - Count nodes grouped by any fields. - - Args: - group_fields (list[str]): Fields to group by, e.g., ["memory_type", "status"] - where_clause (str, optional): Extra WHERE condition. E.g., - "WHERE n.status = 'activated'" - params (dict, optional): Parameters for WHERE clause. - user_name (str, optional): User name for filtering in non-multi-db mode - - Returns: - list[dict]: e.g., [{ 'memory_type': 'WorkingMemory', 'status': 'active', 'count': 10 }, ...] - """ + start_time = time.perf_counter() + logger.info( + "get_grouped_counts by group_fields:%s,where_clause: %s,params:%s,user_name:%s", + group_fields, + where_clause, + params, + user_name, + ) if not group_fields: raise ValueError("group_fields cannot be empty") user_name = user_name if user_name else self._get_config_value("user_name") - # Build user clause user_clause = f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype" if where_clause: where_clause = where_clause.strip() @@ -2194,44 +2190,43 @@ def get_grouped_counts( else: where_clause = f"WHERE {user_clause}" - # Inline parameters if provided if params and isinstance(params, dict): for key, value in params.items(): - # Handle different value types appropriately if isinstance(value, str): value = f"'{value}'" where_clause = where_clause.replace(f"${key}", str(value)) - # Handle user_name parameter in where_clause if "user_name = %s" in where_clause: where_clause = where_clause.replace( "user_name = %s", f"ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = '\"{user_name}\"'::agtype", ) - # Build return fields and group by fields - return_fields = [] - group_by_fields = [] - + cte_select_list = [] + aliases = [] for field in group_fields: alias = field.replace(".", "_") - return_fields.append( - f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype)::text AS {alias}" - ) - group_by_fields.append( - f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype)::text" + aliases.append(alias) + cte_select_list.append( + f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype) AS {alias}" ) - - # Full SQL query construction + outer_select = ", ".join(f"{a}::text" for a in aliases) + outer_group_by = ", ".join(aliases) query = f""" - SELECT {", ".join(return_fields)}, COUNT(*) AS count - FROM "{self.db_name}_graph"."Memory" - {where_clause} - GROUP BY {", ".join(group_by_fields)} + WITH t AS ( + SELECT {", ".join(cte_select_list)} + FROM "{self.db_name}_graph"."Memory" + {where_clause} + LIMIT 1000 + ) + SELECT {outer_select}, count(*) AS count + FROM t + GROUP BY {outer_group_by} """ + logger.info(f"get_grouped_counts query:{query},params:{params}") + try: with self._get_connection() as conn, conn.cursor() as cursor: - # Handle parameterized query if params and isinstance(params, list): cursor.execute(query, params) else: @@ -2250,6 +2245,8 @@ def get_grouped_counts( count_value = row[-1] # Last column is count output.append({**group_values, "count": int(count_value)}) + elapsed = (time.perf_counter() - start_time) * 1000.0 + logger.info("get_grouped_counts internal took %.1f ms", elapsed) return output except Exception as e: From 53d06a1d8e43ea302850c5172fc7f556701e62d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 18 Mar 2026 17:38:56 +0800 Subject: [PATCH 4/6] feat:optimize get_grouped_counts --- src/memos/graph_dbs/polardb.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 7246b8836..f21615a23 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -437,7 +437,7 @@ def remove_oldest_memory( "remove_oldest_memory by memory_type:%s,keep_latest: %s,user_name:%s", memory_type, keep_latest, - user_name + user_name, ) user_name = user_name if user_name else self._get_config_value("user_name") @@ -455,7 +455,9 @@ def remove_oldest_memory( self.format_param_value(user_name), keep_latest, ] - logger.info(f"remove_oldest_memory by select_query:{select_query},select_params:{select_params}") + logger.info( + f"remove_oldest_memory by select_query:{select_query},select_params:{select_params}" + ) try: with self._get_connection() as conn, conn.cursor() as cursor: # Execute query to get IDs to delete From a6adfe6c67ea3c991a286ac2ff660aa84298d0f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 18 Mar 2026 18:02:02 +0800 Subject: [PATCH 5/6] feat:optimize get_by_metadata --- src/memos/graph_dbs/polardb.py | 37 ++++++---------------------------- 1 file changed, 6 insertions(+), 31 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index f21615a23..4d88844df 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -1970,30 +1970,13 @@ def get_by_metadata( knowledgebase_ids: list | None = None, user_name_flag: bool = True, ) -> list[str]: - """ - Retrieve node IDs that match given metadata filters. - Supports exact match. - - Args: - filters: List of filter dicts like: - [ - {"field": "key", "op": "in", "value": ["A", "B"]}, - {"field": "confidence", "op": ">=", "value": 80}, - {"field": "tags", "op": "contains", "value": "AI"}, - ... - ] - user_name (str, optional): User name for filtering in non-multi-db mode - - Returns: - list[str]: Node IDs whose metadata match the filter conditions. (AND logic). - """ + start_time = time.perf_counter() logger.info( f" get_by_metadata user_name:{user_name},filter: {filter}, knowledgebase_ids: {knowledgebase_ids},filters:{filters}" ) user_name = user_name if user_name else self._get_config_value("user_name") - # Build WHERE conditions for cypher query where_conditions = [] for f in filters: @@ -2001,18 +1984,13 @@ def get_by_metadata( op = f.get("op", "=") value = f["value"] - # Format value if isinstance(value, str): - # Escape single quotes using backslash when inside $$ dollar-quoted strings - # In $$ delimiters, Cypher string literals can use \' to escape single quotes escaped_str = value.replace("'", "\\'") escaped_value = f"'{escaped_str}'" elif isinstance(value, list): - # Handle list values - use double quotes for Cypher arrays list_items = [] for v in value: if isinstance(v, str): - # Escape double quotes in string values for Cypher escaped_str = v.replace('"', '\\"') list_items.append(f'"{escaped_str}"') else: @@ -2020,7 +1998,6 @@ def get_by_metadata( escaped_value = f"[{', '.join(list_items)}]" else: escaped_value = f"'{value}'" if isinstance(value, str) else str(value) - # Build WHERE conditions if op == "=": where_conditions.append(f"n.{field} = {escaped_value}") elif op == "in": @@ -2049,22 +2026,19 @@ def get_by_metadata( knowledgebase_ids=knowledgebase_ids, default_user_name=self._get_config_value("user_name"), ) - logger.info(f"[get_by_metadata] user_name_conditions: {user_name_conditions}") + logger.info(f"get_by_metadata user_name_conditions: {user_name_conditions}") - # Add user_name WHERE clause if user_name_conditions: if len(user_name_conditions) == 1: where_conditions.append(user_name_conditions[0]) else: where_conditions.append(f"({' OR '.join(user_name_conditions)})") - # Build filter conditions using common method filter_where_clause = self._build_filter_conditions_cypher(filter) - logger.info(f"[get_by_metadata] filter_where_clause: {filter_where_clause}") + logger.info(f"get_by_metadata filter_where_clause: {filter_where_clause}") where_str = " AND ".join(where_conditions) + filter_where_clause - # Use cypher query cypher_query = f""" SELECT * FROM cypher('{self.db_name}_graph', $$ MATCH (n:Memory) @@ -2074,7 +2048,7 @@ def get_by_metadata( """ ids = [] - logger.info(f"[get_by_metadata] cypher_query: {cypher_query}") + logger.info(f"get_by_metadata cypher_query: {cypher_query}") try: with self._get_connection() as conn, conn.cursor() as cursor: cursor.execute(cypher_query) @@ -2082,7 +2056,8 @@ def get_by_metadata( ids = [str(item[0]).strip('"') for item in results] except Exception as e: logger.warning(f"Failed to get metadata: {e}, query is {cypher_query}") - + elapsed = (time.perf_counter() - start_time) * 1000.0 + logger.info("get_by_metadata internal took %.1f ms", elapsed) return ids @timed From 1b1078a100bda0f970149ec91c7048192f032a2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 18 Mar 2026 18:46:45 +0800 Subject: [PATCH 6/6] fix: remove self._refresh_memory_size --- src/memos/memories/textual/tree_text_memory/organize/manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 96453f5a0..ecd58f309 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -114,7 +114,6 @@ def add( if mode == "sync": self._cleanup_working_memory(user_name) - self._refresh_memory_size(user_name=user_name) return added_ids