Skip to content

Commit 8b30a44

Browse files
authored
refactor&fix: fix a range of bugs in scheduler and revise fine add apis (MemTensor#840)
* fix bugs: try to fix bugs in _submit_web_logs * fix bugs: try to address bugs * fix bugs * refactor: modify examples * revise add operation and fix an unbelievable bug * address the bug issues * the doc file has a format problem which has been fixed in this commit * add a range of new feats for the add operation * address the incompatible issue of local scheduler * feat(scheduler): optimize redis queue consumer group management - Proactively ensure consumer groups exist in '_refresh_stream_keys' for newly discovered streams. - Remove redundant consumer group checks in '_read_new_messages_batch' to improve read performance. - Clean up 'seen_streams' cache when streams are deleted to ensure correct group recreation. - This change reduces unnecessary Redis calls during high-frequency polling. * fix(tests): resolve AttributeError in SimpleStructMemReader tests - Import 'parse_json_result' from 'memos.mem_reader.utils' instead of accessing it as an instance attribute. - Fixes 'AttributeError: 'SimpleStructMemReader' object has no attribute 'parse_json_result'' in 'test_parse_json_result_success' and 'test_parse_json_result_failure'. - Remove incorrect mock assignment of 'parse_json_result' in 'test_process_chat_data'. * fix(mem_reader): pass info dict to add_before_search for correct user_id usage - Update 'add_before_search' signature in 'SimpleStructMemReader' to accept 'info' dict. - Pass 'info' (containing 'user_id' and 'session_id') to 'self.searcher.search' instead of using empty strings. - Add 'test_add_before_search' to 'TestSimpleStructMemReader' to verify the fix and ensure 'searcher.search' receives the correct 'info'. - This ensures that memory searches are scoped to the correct user and session. * refactor add_before_search from mem_reader to SingleCubeView * address bugs * fix: fix the qsize bug of task queue, and accept change from hotfix/scheduler * fix: address some issues to run old scheduler example and kv cache example * fix: address the issue of Top-level import of unavailable module 'torch' * fix: resolve linting errors and make optional dependencies lazy loaded - Fix ambiguous characters and commented-out code in examples/mem_scheduler/quick_start_examples.py - Fix nested if statements in src/memos/mem_os/core.py - Move torch and transformers imports to method scope in src/memos/llms/hf.py to support optional dependencies - Update tests/llms/test_hf.py to patch transformers module directly * refactor: revise the rewrite prompt to make it better * refactor: update examples * refactor: update examples for scheduler * fix bugs: address the unsupported xautoclaim command when redis version larger than 6.2.0 via adding a new feature of manul auto claim with the combination of xpending + xclaim * refactor: review settings * refactor: adjust examples to make it run better for code debugging * refactor: review slow add apis to get a better performance on Halumen * fix bugs: address the issue when set user_redis_queue to false, the status_tracker is still using * refactor: allow the code to run without rabbitmq * refactor: create a _parse_pending_entry for redis queue * refactor: add a try/catch for status_tracker
1 parent 5f811d4 commit 8b30a44

10 files changed

Lines changed: 313 additions & 140 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ cython_debug/
204204
# and can be added to the global gitignore or merged into this file. For a more nuclear
205205
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
206206
.idea/
207+
.trae
207208

208209
# VSCode
209210
.vscode*

docker/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ API_SCHEDULER_ON=true
123123
API_SEARCH_WINDOW_SIZE=5
124124
# Specify how many rounds of previous conversations (history) to retrieve and consider during the 'hybrid search' (fast search+asynchronous fine search). This helps provide context aware search results
125125
API_SEARCH_HISTORY_TURNS=5
126+
MEMSCHEDULER_USE_REDIS_QUEUE=false
126127

127128
## Graph / vector stores
128129
# Neo4j database selection mode

examples/mem_scheduler/quick_start_examples.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ def kv_cache_only():
146146

147147
def run_scheduler_example():
148148
# 使用 MemScheduler 加载主 MOS(Memory-Oriented System)配置文件
149-
config = parse_yaml("./examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml")
149+
config = parse_yaml(
150+
f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml"
151+
)
150152
# 将解析出的配置字典传入 MOSConfig 构造器, 构建配置对象
151153
mos_config = MOSConfig(**config)
152154
# 使用配置对象初始化 MOS 系统实例
@@ -159,12 +161,12 @@ def run_scheduler_example():
159161

160162
# 从 YAML 文件加载 MemCube(记忆立方体)的通用配置
161163
config = GeneralMemCubeConfig.from_yaml_file(
162-
"./examples/data/config/mem_scheduler/mem_cube_config.yaml"
164+
f"{BASE_DIR}/examples/data/config/mem_scheduler/mem_cube_config.yaml"
163165
)
164166
# 定义 MemCube 的唯一标识符
165167
mem_cube_id = "mem_cube_5"
166168
# 定义 MemCube 的本地存储路径(路径中包含用户 ID 和 MemCube ID)
167-
mem_cube_name_or_path = f"./outputs/mem_scheduler/{user_id}/{mem_cube_id}"
169+
mem_cube_name_or_path = f"{BASE_DIR}/outputs/mem_scheduler/{user_id}/{mem_cube_id}"
168170

169171
# 如果该路径已存在, 则先删除旧目录
170172
if Path(mem_cube_name_or_path).exists():

examples/mem_scheduler/scheduler_for_async_tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def submit_tasks():
5757
TEST_HANDLER_LABEL = "test_handler"
5858
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})
5959

60-
# 10s to restart
60+
# 5s to restart
6161
mem_scheduler.orchestrator.tasks_min_idle_ms[TEST_HANDLER_LABEL] = 5_000
6262

6363
tmp_dir = Path("./tmp")

src/memos/mem_reader/simple_struct.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -614,11 +614,9 @@ def _read_memory(
614614
serialized_origin_memories = json.dumps(
615615
[one.memory for one in original_memory_group], indent=2
616616
)
617-
revised_memory_list = self.rewrite_memories(
617+
revised_memory_list = self.filter_hallucination_in_memories(
618618
messages=combined_messages,
619619
memory_list=original_memory_group,
620-
user_only=os.getenv("SIMPLE_STRUCT_REWRITE_USER_ONLY", "true").lower()
621-
== "false",
622620
)
623621
serialized_revised_memories = json.dumps(
624622
[one.memory for one in revised_memory_list], indent=2

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def initialize_modules(
225225
process_llm = chat_llm
226226

227227
try:
228-
if redis_client:
228+
if redis_client and self.use_redis_queue:
229229
self.status_tracker = TaskStatusTracker(redis_client)
230230
if self.dispatcher:
231231
self.dispatcher.status_tracker = self.status_tracker
@@ -305,7 +305,7 @@ def status_tracker(self) -> TaskStatusTracker | None:
305305
available via RedisSchedulerModule. This mirrors the lazy pattern used
306306
by `mem_cube` so downstream modules can safely access the tracker.
307307
"""
308-
if self._status_tracker is None:
308+
if self._status_tracker is None and self.use_redis_queue:
309309
try:
310310
self._status_tracker = TaskStatusTracker(self.redis)
311311
# Propagate to submodules when created lazily
@@ -314,7 +314,8 @@ def status_tracker(self) -> TaskStatusTracker | None:
314314
if self.memos_message_queue:
315315
self.memos_message_queue.set_status_tracker(self._status_tracker)
316316
except Exception as e:
317-
logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True)
317+
logger.warning(f"Failed to lazy-initialize status_tracker: {e}", exc_info=True)
318+
318319
return self._status_tracker
319320

320321
@status_tracker.setter
@@ -869,6 +870,8 @@ def _submit_web_logs(
869870
messages = [messages] # transform single message to list
870871

871872
for message in messages:
873+
if self.rabbitmq_config is None:
874+
return
872875
try:
873876
# Always call publish; the publisher now caches when offline and flushes after reconnect
874877
logger.info(

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ def __init__(
108108
)
109109

110110
self.metrics = metrics
111-
self._status_tracker: TaskStatusTracker | None = None
112-
# Use setter to allow propagation and keep a single source of truth
113111
self.status_tracker = status_tracker
114112
self.submit_web_logs = submit_web_logs # ADDED
115113

@@ -118,35 +116,6 @@ def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None:
118116
return
119117
# This is handled in BaseScheduler now
120118

121-
@property
122-
def status_tracker(self) -> TaskStatusTracker | None:
123-
"""Lazy-initialized status tracker for the dispatcher.
124-
125-
If the tracker is None, attempt to initialize from the Redis-backed
126-
components available to the dispatcher (queue or orchestrator).
127-
"""
128-
if self._status_tracker is None:
129-
try:
130-
self._status_tracker = TaskStatusTracker(self.redis)
131-
# Propagate to submodules when created lazily
132-
if self.memos_message_queue:
133-
self.memos_message_queue.set_status_tracker(self._status_tracker)
134-
except Exception as e:
135-
logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True)
136-
return self._status_tracker
137-
138-
@status_tracker.setter
139-
def status_tracker(self, value: TaskStatusTracker | None) -> None:
140-
self._status_tracker = value
141-
# Propagate to the queue if possible
142-
try:
143-
if self.memos_message_queue and hasattr(self.memos_message_queue, "status_tracker"):
144-
self.memos_message_queue.status_tracker = value
145-
except Exception as e:
146-
logger.warning(
147-
f"Failed to propagate dispatcher status_tracker to queue: {e}", exc_info=True
148-
)
149-
150119
def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem):
151120
"""
152121
Create a wrapper around the handler to track task execution and capture results.

0 commit comments

Comments
 (0)