Skip to content

Commit 96a1dd6

Browse files
authored
Dev 20260407 v2.0.13 (#1456)
## Description Please include a summary of the change, the problem it solves, the implementation approach, and relevant context. List any dependencies required for this change. Related Issue (Required): Fixes #issue_number ## Type of change Please delete options that are not relevant. - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Refactor (does not change functionality, e.g. code style improvements, linting) - [ ] Documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration - [ ] Unit Test - [ ] Test Script Or Test Steps (please provide) - [ ] Pipeline Automated API Test (please provide) ## Checklist - [ ] I have performed a self-review of my own code | 我已自行检查了自己的代码 - [ ] I have commented my code in hard-to-understand areas | 我已在难以理解的地方对代码进行了注释 - [ ] I have added tests that prove my fix is effective or that my feature works | 我已添加测试以证明我的修复有效或功能正常 - [ ] I have created related documentation issue/PR in [MemOS-Docs](https://github.com/MemTensor/MemOS-Docs) (if applicable) | 我已在 [MemOS-Docs](https://github.com/MemTensor/MemOS-Docs) 中创建了相关的文档 issue/PR(如果适用) - [ ] I have linked the issue to this PR (if applicable) | 我已将 issue 链接到此 PR(如果适用) - [ ] I have mentioned the person who will review this PR | 我已提及将审查此 PR 的人 ## Reviewer Checklist - [ ] closes #xxxx (Replace xxxx with the GitHub issue number) - [ ] Made sure Checks passed - [ ] Tests have been provided
2 parents df65802 + d2e7a04 commit 96a1dd6

File tree

12 files changed

+238
-37
lines changed

12 files changed

+238
-37
lines changed

apps/memos-local-plugin/adapters/hermes/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def _is_trivial(text: str) -> bool:
7878
if keys <= {"ok", "success", "status", "result", "error", "message"}:
7979
vals = list(obj.values())
8080
if all(
81-
isinstance(v, (bool, type(None))) or (isinstance(v, str) and len(v) < 20)
81+
isinstance(v, bool | type(None)) or (isinstance(v, str) and len(v) < 20)
8282
for v in vals
8383
):
8484
return True

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
##############################################################################
55

66
name = "MemoryOS"
7-
version = "2.0.12"
7+
version = "2.0.13"
88
description = "Intelligence Begins with Memory"
99
license = {text = "Apache-2.0"}
1010
readme = "README.md"

src/memos/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "2.0.12"
1+
__version__ = "2.0.13"
22

33
from memos.configs.mem_cube import GeneralMemCubeConfig
44
from memos.configs.mem_os import MOSConfig

src/memos/mem_scheduler/base_mixins/queue_ops.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
6565
logger.warning("status_tracker.task_submitted failed", exc_info=True)
6666

6767
if self.disabled_handlers and msg.label in self.disabled_handlers:
68-
logger.info("Skipping disabled handler: %s - %s", msg.label, msg.content)
68+
logger.debug(
69+
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
70+
msg.label,
71+
msg.item_id,
72+
msg.user_id,
73+
msg.mem_cube_id,
74+
)
6975
continue
7076

7177
task_priority = self.orchestrator.get_task_priority(task_label=msg.label)
@@ -74,6 +80,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
7480
else:
7581
queued_msgs.append(msg)
7682

83+
logger.info(
84+
"Submit scheduler messages summary. total=%s immediate=%s queued=%s queue_backend=%s",
85+
len(messages),
86+
len(immediate_msgs),
87+
len(queued_msgs),
88+
"redis_queue" if self.use_redis_queue else "local_queue",
89+
)
90+
7791
if immediate_msgs:
7892
for m in immediate_msgs:
7993
emit_monitor_event(
@@ -199,6 +213,15 @@ def _message_consumer(self) -> None:
199213
if messages:
200214
self.dispatcher.on_messages_enqueued(messages)
201215

216+
if len(messages) >= self.consume_batch:
217+
unique_labels = sorted({msg.label for msg in messages})
218+
logger.debug(
219+
"Consumer dequeued batch. batch_size=%s consume_batch=%s unique_labels=%s queue_backend=%s",
220+
len(messages),
221+
self.consume_batch,
222+
unique_labels,
223+
"redis_queue" if self.use_redis_queue else "local_queue",
224+
)
202225
self.dispatcher.dispatch(messages)
203226
except Exception as e:
204227
logger.error("Error dispatching messages: %s", e)

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
226226
if task_item.item_id in self._running_tasks:
227227
task_item.mark_completed(result)
228228
del self._running_tasks[task_item.item_id]
229-
logger.info(f"Task completed: {task_item.get_execution_info()}")
229+
logger.debug(f"Task completed: {task_item.get_execution_info()}")
230230
return result
231231

232232
except Exception as e:
@@ -630,12 +630,12 @@ def execute_task(
630630
with self._task_lock:
631631
self._futures.add(future)
632632
future.add_done_callback(self._handle_future_result)
633-
logger.info(
633+
logger.debug(
634634
f"Dispatch {len(msgs)} message(s) to {task_label} handler for user {user_id} and mem_cube {mem_cube_id}."
635635
)
636636
else:
637637
# For synchronous execution, the wrapper will run and remove the task upon completion
638-
logger.info(
638+
logger.debug(
639639
f"Execute {len(msgs)} message(s) synchronously for {task_label} for user {user_id} and mem_cube {mem_cube_id}."
640640
)
641641
wrapped_handler(msgs)
@@ -653,6 +653,12 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]):
653653

654654
# Group messages by user_id and mem_cube_id first
655655
user_cube_groups = group_messages_by_user_and_mem_cube(msg_list)
656+
logger.info(
657+
"Dispatcher received batch. total_messages=%s user_groups=%s unique_labels=%s",
658+
len(msg_list),
659+
len(user_cube_groups),
660+
sorted({msg.label for msg in msg_list}),
661+
)
656662

657663
# Process each user and mem_cube combination
658664
for user_id, cube_groups in user_cube_groups.items():

src/memos/mem_scheduler/task_schedule_modules/local_queue.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,12 @@ def put(
9595

9696
try:
9797
self.queue_streams[stream_key].put(item=message, block=block, timeout=timeout)
98-
logger.info(
99-
f"Message successfully put into queue '{stream_key}'. Current size: {self.queue_streams[stream_key].qsize()}"
98+
logger.debug(
99+
"Local queue enqueued. stream=%s size=%s label=%s item_id=%s",
100+
stream_key,
101+
self.queue_streams[stream_key].qsize(),
102+
message.label,
103+
message.item_id,
100104
)
101105
except Exception as e:
102106
logger.error(f"Failed to put message into queue '{stream_key}': {e}", exc_info=True)
@@ -117,7 +121,7 @@ def get(
117121

118122
# Return empty list if queue does not exist
119123
if stream_key not in self.queue_streams:
120-
logger.error(f"Stream {stream_key} does not exist when trying to get messages.")
124+
logger.debug("Stream %s does not exist when trying to get messages", stream_key)
121125
return []
122126

123127
# Ensure we always request a batch so we get a list back
@@ -174,6 +178,14 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
174178
fetched = self.get_nowait(stream_key=stream_key, batch_size=needed)
175179
messages.extend(fetched)
176180

181+
if messages and len(messages) >= batch_size:
182+
logger.debug(
183+
"Local queue dequeued batch. batch_size=%s requested_batch_size=%s active_streams=%s",
184+
len(messages),
185+
batch_size,
186+
len(stream_keys),
187+
)
188+
177189
return messages
178190

179191
def qsize(self) -> dict:
@@ -196,9 +208,11 @@ def clear(self, stream_key: str | None = None) -> None:
196208
if stream_key:
197209
if stream_key in self.queue_streams:
198210
self.queue_streams[stream_key].clear()
211+
logger.info("Cleared local queue stream: %s", stream_key)
199212
else:
200213
for queue in self.queue_streams.values():
201214
queue.clear()
215+
logger.info("Cleared all local queue streams. stream_count=%s", len(self.queue_streams))
202216

203217
@property
204218
def unfinished_tasks(self) -> int:

src/memos/mem_scheduler/task_schedule_modules/redis_queue.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,16 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
384384
if len(self.message_pack_cache) == 0:
385385
return []
386386
else:
387-
return self.message_pack_cache.popleft()
387+
batch = self.message_pack_cache.popleft()
388+
if len(batch) >= batch_size:
389+
logger.debug(
390+
"[REDIS_QUEUE] Dequeued batch. batch_size=%s requested_batch_size=%s cache_packs_remaining=%s stream_count=%s",
391+
len(batch),
392+
batch_size,
393+
len(self.message_pack_cache),
394+
len(self.get_stream_keys()),
395+
)
396+
return batch
388397

389398
def _ensure_consumer_group(self, stream_key) -> None:
390399
"""Ensure the consumer group exists for the stream."""
@@ -449,9 +458,13 @@ def put(
449458
message_id = self._redis_conn.xadd(
450459
stream_key, message_data, maxlen=self.max_len, approximate=True
451460
)
452-
453-
logger.info(
454-
f"Added message {message_id} to Redis stream: {message.label} - {message.content[:100]}..."
461+
logger.debug(
462+
"[REDIS_QUEUE] Enqueued message. message_id=%s stream=%s label=%s item_id=%s stream_cache_size=%s",
463+
message_id,
464+
stream_key,
465+
message.label,
466+
message.item_id,
467+
len(self._stream_keys_cache),
455468
)
456469

457470
except Exception as e:
@@ -494,7 +507,11 @@ def ack_message(
494507
# Optionally delete the message from the stream to keep it clean
495508
try:
496509
self._redis_conn.xdel(stream_key, redis_message_id)
497-
logger.info(f"Successfully delete acknowledged message {redis_message_id}")
510+
logger.debug(
511+
"[REDIS_QUEUE] Ack/delete message. redis_message_id=%s stream=%s",
512+
redis_message_id,
513+
stream_key,
514+
)
498515
except Exception as e:
499516
logger.warning(f"Failed to delete acknowledged message {redis_message_id}: {e}")
500517

@@ -989,7 +1006,7 @@ def show_task_status(self, stream_key_prefix: str | None = None) -> dict[str, di
9891006
)
9901007
stream_keys = self.get_stream_keys(stream_key_prefix=effective_prefix)
9911008
if not stream_keys:
992-
logger.info(f"No Redis streams found for the configured prefix: {effective_prefix}")
1009+
logger.debug(f"No Redis streams found for the configured prefix: {effective_prefix}")
9931010
return {}
9941011

9951012
grouped: dict[str, dict[str, int]] = {}
@@ -1157,7 +1174,7 @@ def connect(self) -> None:
11571174
self._redis_conn.ping()
11581175
self._is_connected = True
11591176
self._check_xautoclaim_support()
1160-
logger.debug("Redis connection established successfully")
1177+
logger.info("Redis connection established successfully")
11611178
# Start stream keys refresher when connected
11621179
self._start_stream_keys_refresh_thread()
11631180
except Exception as e:
@@ -1174,7 +1191,7 @@ def disconnect(self) -> None:
11741191
self._stop_stream_keys_refresh_thread()
11751192
if self._is_listening:
11761193
self.stop_listening()
1177-
logger.debug("Disconnected from Redis")
1194+
logger.info("Disconnected from Redis")
11781195

11791196
def __enter__(self):
11801197
"""Context manager entry."""
@@ -1379,7 +1396,7 @@ def _update_stream_cache_with_log(
13791396
self._stream_keys_cache = active_stream_keys
13801397
self._stream_keys_last_refresh = time.time()
13811398
cache_count = len(self._stream_keys_cache)
1382-
logger.info(
1399+
logger.debug(
13831400
f"Refreshed stream keys cache: {cache_count} active keys, "
13841401
f"{deleted_count} deleted, {len(candidate_keys)} candidates examined."
13851402
)

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
9393
"""Submit messages to the message queue (either local queue or Redis)."""
9494
if isinstance(messages, ScheduleMessageItem):
9595
messages = [messages]
96+
if len(messages) < 1:
97+
logger.error("submit_messages called with empty payload")
98+
return
9699

97100
current_trace_id = get_current_trace_id()
98101

@@ -104,18 +107,25 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
104107
user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, task_label=msg.label
105108
)
106109

107-
if len(messages) < 1:
108-
logger.error("Submit empty")
109-
elif len(messages) == 1:
110+
if len(messages) == 1:
110111
if getattr(messages[0], "timestamp", None) is None:
111112
messages[0].timestamp = get_utc_now()
112-
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
113-
emit_monitor_event(
114-
"enqueue",
115-
messages[0],
116-
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
117-
)
118-
self.memos_message_queue.put(messages[0])
113+
if self.disabled_handlers and messages[0].label in self.disabled_handlers:
114+
logger.debug(
115+
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
116+
messages[0].label,
117+
messages[0].item_id,
118+
messages[0].user_id,
119+
messages[0].mem_cube_id,
120+
)
121+
else:
122+
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
123+
emit_monitor_event(
124+
"enqueue",
125+
messages[0],
126+
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
127+
)
128+
self.memos_message_queue.put(messages[0])
119129
else:
120130
user_cube_groups = group_messages_by_user_and_mem_cube(messages)
121131

@@ -132,8 +142,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
132142
message.timestamp = get_utc_now()
133143

134144
if self.disabled_handlers and message.label in self.disabled_handlers:
135-
logger.info(
136-
f"Skipping disabled handler: {message.label} - {message.content}"
145+
logger.debug(
146+
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
147+
message.label,
148+
message.item_id,
149+
message.user_id,
150+
message.mem_cube_id,
137151
)
138152
continue
139153

@@ -148,9 +162,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
148162
},
149163
)
150164
self.memos_message_queue.put(message)
151-
logger.info(
152-
f"Submitted message to local queue: {message.label} - {message.content}"
153-
)
165+
166+
logger.info(
167+
"Queue submit completed. backend=%s total=%s",
168+
"redis_queue" if self.use_redis_queue else "local_queue",
169+
len(messages),
170+
)
154171

155172
def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
156173
return self.memos_message_queue.get_messages(batch_size=batch_size)

src/memos/multi_mem_cube/single_cube.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
)
2323
from memos.memories.textual.item import TextualMemoryItem
2424
from memos.multi_mem_cube.views import MemCubeView
25-
from memos.search import search_text_memories
25+
from memos.search import resolve_filter_for_cube, search_text_memories
2626
from memos.templates.mem_reader_prompts import PROMPT_MAPPING
2727
from memos.types.general_types import (
2828
FINE_STRATEGY,
@@ -91,6 +91,13 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]:
9191
Unified memory search handling (text + preference memories).
9292
Preference memories are now searched through the same _search_text flow.
9393
"""
94+
cube_filter = resolve_filter_for_cube(search_req.filter, self.cube_id)
95+
if cube_filter is not search_req.filter:
96+
import copy
97+
98+
search_req = copy.copy(search_req)
99+
search_req.filter = cube_filter
100+
94101
# Create UserContext object
95102
user_context = UserContext(
96103
user_id=search_req.user_id,

src/memos/search/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
1-
from .search_service import SearchContext, build_search_context, search_text_memories
1+
from .search_service import (
2+
SearchContext,
3+
build_search_context,
4+
resolve_filter_for_cube,
5+
search_text_memories,
6+
)
27

38

4-
__all__ = ["SearchContext", "build_search_context", "search_text_memories"]
9+
__all__ = [
10+
"SearchContext",
11+
"build_search_context",
12+
"resolve_filter_for_cube",
13+
"search_text_memories",
14+
]

0 commit comments

Comments
 (0)