diff --git a/evaluation/.env-example b/evaluation/.env-example index 4cb153b75..f2c20868c 100644 --- a/evaluation/.env-example +++ b/evaluation/.env-example @@ -1,3 +1,4 @@ +# memory process model MODEL="gpt-4o-mini" OPENAI_API_KEY="sk-***REDACTED***" OPENAI_BASE_URL="http://***.***.***.***:3000/v1" @@ -6,6 +7,13 @@ MEM0_API_KEY="m0-***REDACTED***" ZEP_API_KEY="z_***REDACTED***" +# response model CHAT_MODEL="gpt-4o-mini" CHAT_MODEL_BASE_URL="http://***.***.***.***:3000/v1" CHAT_MODEL_API_KEY="sk-***REDACTED***" + +MEMOS_KEY="Token mpg-xxxxx" +MEMOS_URL="https://apigw-pre.memtensor.cn/api/openmem/v1" + +MEMOBASE_API_KEY="xxxxx" +MEMOBASE_PROJECT_URL="http://xxx.xxx.xxx.xxx:8019" \ No newline at end of file diff --git a/evaluation/README.md b/evaluation/README.md index 19da665ad..16752c075 100644 --- a/evaluation/README.md +++ b/evaluation/README.md @@ -34,3 +34,21 @@ This repository provides tools and scripts for evaluating the LoCoMo dataset usi ``` โœ๏ธ For evaluating OpenAI's native memory feature with the LoCoMo dataset, please refer to the detailed guide: [OpenAI Memory on LoCoMo - Evaluation Guide](./scripts/locomo/openai_memory_locomo_eval_guide.md). + +### LongMemEval Evaluation +First prepare the dataset `longmemeval_s` from https://huggingface.co/datasets/xiaowu0162/longmemeval-cleaned +, and save it as `data/longmemeval/longmemeval_s.json` + +```bash +# Edit the configuration in ./scripts/run_lme_eval.sh +# Specify the model and memory backend you want to use (e.g., mem0, zep, etc.) +./scripts/run_lme_eval.sh +``` + +### prefEval Evaluation + +### personaMem Evaluation +get `questions_32k.csv` and `shared_contexts_32k.jsonl` from https://huggingface.co/datasets/bowen-upenn/PersonaMem and save them at `data/personamem/` +```bash +./scripts/run_pm_eval.sh +``` diff --git a/evaluation/scripts/PrefEval/pref_eval.py b/evaluation/scripts/PrefEval/pref_eval.py new file mode 100644 index 000000000..1ad40f24d --- /dev/null +++ b/evaluation/scripts/PrefEval/pref_eval.py @@ -0,0 +1,300 @@ +import asyncio +import aiohttp +import json +import re +import argparse +from typing import Dict, Any +from collections import Counter +from tqdm.asyncio import tqdm +import os + +API_KEY = os.getenv("OPENAI_API_KEY") +API_URL = os.getenv("OPENAI_BASE_URL") + +INPUT_FILE = "./data/prefeval/pref_memos.jsonl" +OUTPUT_FILE = "./data/prefeval/eval_pref_memos.jsonl" + + +async def call_gpt4o_mini_async(session: aiohttp.ClientSession, prompt: str) -> str: + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}"} + payload = { + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": prompt}], + "temperature": 0, + "max_tokens": 500, + } + + try: + async with session.post(API_URL, headers=headers, json=payload, timeout=30) as response: + response.raise_for_status() + data = await response.json() + return data["choices"][0]["message"]["content"] + except Exception as e: + print(f"API call failed: {str(e)}") + return "" + + +def parse_xml_response(response: str, tag: str) -> str: + match = re.search(f"<{tag}>(.*?)", response, re.DOTALL) + return match.group(1).strip() if match else "" + + +async def evaluate_violate_preference_async( + session: aiohttp.ClientSession, preference: str, question: str, response: str +) -> Dict[str, str]: + prompt = f"""You will analyze a conversation between a user and an assistant, focusing on whether the assistant's response violates the user's stated preference. +Evaluate the response based on these criteria: + +Answer "Yes" if: + +1. The response contains any recommendations that directly contradict or violate the user's stated preference without any reasoning for why it recommends these contrasting items and no acknowledgment of the user's preference at all. + +Answer "No" if: + +1. The response respects the user's stated preference. +2. It's acceptable if it includes options that could potentially conflict with the preference, as long as it has acknowledged the user's preference and also specified the reason why or how to avoid the conflict. +3. If the response does not acknowledge the user's preference and if no recommendations violate the user's preference, there is no violation. + +Here is the conversation: +User's preference: {preference} +User query: {question} +Assistant response: {response} + +Examine the response meticulously and answer. Answer in this exact XML format: +[1 very short sentence explanation] +[Yes/No]""" + + api_response = await call_gpt4o_mini_async(session, prompt) + return { + "explanation": parse_xml_response(api_response, "explanation"), + "answer": parse_xml_response(api_response, "answer"), + } + + +async def evaluate_acknowledge_preference_async( + session: aiohttp.ClientSession, question: str, response: str +) -> Dict[str, str]: + prompt = f"""You will analyze a conversation between a user and an assistant, focusing on whether the assistant acknowledges any user preference in answering the user's query. + +Evaluate the response based on these stringent criteria: + +1. Check if the response explicitly or implicitly mentions or references a user preference. 2. The content of the preference is irrelevant for this check; only its presence matters. + +Answer "Yes" if: + +1. The response explicitly mentions or refers to a user preference in answering the user's question. Examples include: "Based on your previous preference for xxxx"; "Based on your previous preference, I would suggest you xxxx"; "Since you mentioned you prefer/dislike xxxx"; "Since you are a xxxx"; "I will recommend the following given your preference for xxx", etc. You should extract the "xxxx" in your answer. If it only says "Based on our previous conversation, I recommend..." and does not explicitly mention any preference, you should answer 'No'. +2. The response assumes the user preference in answering the user's question implicitly. For example, when the user asks 'Can you recommend me cars to drive?', if the response is 'Based on your preference, I will recommend non-EV cars, ...', then this indicates the assistant assumes that the user's preference is a dislike of EV cars, and you should answer 'Yes'. + +Answer "No" if the response does not mention or refer to any user preference explicitly or implicitly. If you cannot extract the sentence stating what the preference is, answer 'No'. + +Here is the assistant's response: +The user's question is: {question} +Assistant response: {response} + +Examine the response meticulously and answer. Please answer in this exact XML format without any additional text: +[quote of the sentence that acknowledges/mentions what the preference is; leave it blank if there is none] +[Yes/No]""" + + api_response = await call_gpt4o_mini_async(session, prompt) + return { + "preference_mention": parse_xml_response(api_response, "preference"), + "answer": parse_xml_response(api_response, "answer"), + } + + +async def evaluate_hallucinate_preference_async( + session: aiohttp.ClientSession, preference: str, restatement: str +) -> Dict[str, str]: + if not restatement.strip(): + return {"explanation": "No restatement provided by assistant", "answer": "No"} + + prompt = f"""You will analyze a conversation between a user and an assistant, focusing on whether the assistant's restatement of the user's stated preference is the same preference. Evaluate the response based on these stringent criteria to answer if the assistant has hallucinated the preference or not: + +Answer "Yes" if: + +1. The assistant's restatement differs from the original preference in meaning or intent. +2. The assistant's restatement contradicts the original preference. +3. The assistant's restatement is not related or irrelevant to the original preference. + +Answer "No" if has not hallucinated: + +1. The assistant's restatement matches the original preference. +2. The assistant's restatement is a minor paraphrase that fully preserves the meaning and intent of the original preference. +3. The restatement is just empty, no hallucination. + +Here is the information: +Original user preference: "{preference}" +Assistant's restatement: "{restatement}" +Examine the original preference and the assistant's restatement meticulously and answer. Please answer in this exact XML format without any other additional text: +[1 short sentence explanation] +[Yes/No]""" + + api_response = await call_gpt4o_mini_async(session, prompt) + return { + "explanation": parse_xml_response(api_response, "explanation"), + "answer": parse_xml_response(api_response, "answer"), + } + + +async def evaluate_helpful_response_async( + session: aiohttp.ClientSession, question: str, response: str +) -> Dict[str, str]: + prompt = f"""You will analyze a conversation between a user and an assistant, focusing on whether the assistant provides any substantive response to the user's query. +Evaluate the response based on these stringent criteria: + +Answer "Yes" if any of the following: + +1. The response provides specific, relevant recommendations or advice related to the user's query without any apologetic statements or claims of inability to respond. +2. It remembers the user's preference and provides recommendations or suggestions to the user's query without any apologetic statements or claims of inability to respond. + +Answer "No" if any of the following: + +1. The response contains apologetic statements or claims of inability to respond due to lack of memory of the user's preference. +2. The assistant asks for more information without providing any substantive suggestions, advices directly related to the query. +3. The response doesn't address the user's query. +4. Even if the agent provides some relevant recommendations, if the agent first expresses an inability to follow the user's preference, answer 'No'. + +Examples of unhelpful responses: (Answer: No): + +"Unfortunately, I don't actually have enough context about your specific preferences for xxx" +"Unfortunately, we haven't had a previous discussion about your preferences for xxx. Could you let me know your preference for xxx?" +"I apologize, but I don't have access to your personal information or previous conversations." +"I'm sorry, but I can't provide a specific answer without more details." + +Here is the conversation: +User query: {question} +Assistant response: {response} + +Examine the response meticulously and answer. Answer in this exact XML format: +[1 very short sentence explanation] +[Yes/No]""" + + api_response = await call_gpt4o_mini_async(session, prompt) + return { + "explanation": parse_xml_response(api_response, "explanation"), + "answer": parse_xml_response(api_response, "answer"), + } + + +def classify_error_type(evaluation_results: Dict[str, Any]) -> str: + violate = evaluation_results["violate_preference"]["answer"] + acknowledge = evaluation_results["acknowledge_preference"]["answer"] + hallucinate = evaluation_results["hallucinate_preference"]["answer"] + helpful = evaluation_results["helpful_response"]["answer"] + + if violate == "Yes" and acknowledge == "No" and helpful == "Yes": + return "Preference-Unaware Violation" + elif violate == "Yes" and acknowledge == "Yes" and hallucinate == "Yes" and helpful == "Yes": + return "Preference Hallucination Violation" + elif violate == "Yes" and acknowledge == "Yes" and hallucinate == "No" and helpful == "Yes": + return "Inconsistency Violation" + elif violate == "No" and helpful == "No": + return "Unhelpful Response" + else: + return "Unknown/No Error" + + +async def process_line( + line: str, session: aiohttp.ClientSession, semaphore: asyncio.Semaphore +) -> Dict[str, Any]: + async with semaphore: + data = json.loads(line.strip()) + preference = data["preference"] + response = data["response"] + question = data["question"] + eval2 = await evaluate_acknowledge_preference_async(session, question, response) + + tasks = [ + evaluate_violate_preference_async(session, preference, question, response), + evaluate_hallucinate_preference_async(session, preference, eval2["preference_mention"]), + evaluate_helpful_response_async(session, question, response), + ] + eval1, eval3, eval4 = await asyncio.gather(*tasks) + + evaluations = { + "violate_preference": eval1, + "acknowledge_preference": eval2, + "hallucinate_preference": eval3, + "helpful_response": eval4, + } + + result = { + "original_data": data, + "evaluations": evaluations, + "error_type": classify_error_type(evaluations), + } + return result + + +def log_summary(error_counter: Counter, total_samples: int) -> Dict[str, Dict[str, float]]: + summary_data = {} + print("\n--- Error Type Summary ---") + + if total_samples == 0: + print("No samples were processed.") + print("--------------------------") + return summary_data + + print(f"Total samples processed: {total_samples}") + sorted_errors = sorted(error_counter.items(), key=lambda item: item[1], reverse=True) + + for error_type, count in sorted_errors: + percentage = (count / total_samples) * 100 + summary_data[error_type] = {"count": count, "percentage": percentage} + print(f"- {error_type}: {count} ({percentage:.2f}%)") + + print("--------------------------") + print("\nProcessing complete.") + + return summary_data + + +async def main(concurrency_limit: int): + semaphore = asyncio.Semaphore(concurrency_limit) + error_counter = Counter() + + print(f"Starting evaluation with a concurrency limit of {concurrency_limit}...") + + async with aiohttp.ClientSession() as session: + try: + with open(INPUT_FILE, "r", encoding="utf-8") as f: + lines = f.readlines() + except FileNotFoundError: + print(f"Error: Input file not found at '{INPUT_FILE}'") + return + + tasks = [process_line(line, session, semaphore) for line in lines] + + with open(OUTPUT_FILE, "w", encoding="utf-8") as outfile: + pbar = tqdm( + asyncio.as_completed(tasks), + total=len(tasks), + desc="Processing samples concurrently", + unit="sample", + ) + for future in pbar: + try: + result = await future + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + + error_type = result["error_type"] + error_counter[error_type] += 1 + pbar.set_postfix({"Latest Type": error_type}) + + except Exception as e: + print(f"An error occurred while processing a line: {e}") + + summary_results = log_summary(error_counter, len(lines)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Evaluate assistant responses from a JSONL file.") + parser.add_argument( + "--concurrency-limit", + type=int, + default=10, + help="The maximum number of concurrent API calls.", + ) + args = parser.parse_args() + + asyncio.run(main(concurrency_limit=args.concurrency_limit)) diff --git a/evaluation/scripts/PrefEval/pref_memos.py b/evaluation/scripts/PrefEval/pref_memos.py new file mode 100644 index 000000000..950372329 --- /dev/null +++ b/evaluation/scripts/PrefEval/pref_memos.py @@ -0,0 +1,139 @@ +import argparse +import concurrent.futures +import json +import os +import sys +import time +import tiktoken +from dotenv import load_dotenv +from openai import OpenAI +from tqdm import tqdm + +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) +from utils.memos_api import MemOSAPI + +load_dotenv() + +memos_key = os.getenv("MEMOS_KEY") +memos_url = os.getenv("MEMOS_URL") +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +BASE_URL = os.getenv("OPENAI_BASE_URL") + +MODEL_NAME = "gpt-4o-mini" +INPUT_FILE = "./data/prefeval/pref_processed.jsonl" +OUTPUT_FILE = "./data/prefeval/pref_memos.jsonl" + +tokenizer = tiktoken.get_encoding("cl100k_base") + + +def process_line(line_data: tuple, mem_client: MemOSAPI, openai_client: OpenAI) -> dict | None: + """Processes a single line from the input file.""" + i, line = line_data + timestamp = int(time.time() * 1000) + user_id = f"user_line_{i}_{timestamp}" + conv_id = f"conv_line_{i}_{timestamp}" + + try: + original_data = json.loads(line) + conversation = original_data.get("conversation", []) + question = original_data.get("question") + + if not question: + original_data["response"] = "Question not found in this line." + return original_data + + start_time_conv = time.monotonic() + if conversation: + mem_client.add(conversation, user_id, conv_id) + add_conversation_duration = time.monotonic() - start_time_conv + + start_time_search = time.monotonic() + relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=6) + search_memories_duration = time.monotonic() - start_time_search + + memories_str = "\n".join( + f"- {entry.get('memory_value', '')}" + for entry in relevant_memories.get("memory_detail_list", []) + ) + memory_tokens_used = len(tokenizer.encode(memories_str)) + + system_prompt = f"You are a helpful AI. Answer the question based on the query and the following memories:\nUser Memories:\n{memories_str}" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": question}, + ] + + response = openai_client.chat.completions.create(model=MODEL_NAME, messages=messages) + assistant_response = response.choices[0].message.content + original_data["response"] = assistant_response + + original_data["metrics"] = { + "add_conversation_duration_seconds": add_conversation_duration, + "search_memories_duration_seconds": search_memories_duration, + "memory_tokens_used": memory_tokens_used, + "retrieved_memories_text": memories_str, + } + return original_data + + except Exception as e: + print(f"Error processing line {i + 1} (user_id: {user_id}): {e}") + return None + + +def main(): + parser = argparse.ArgumentParser(description="Process a JSONL file using MemOS and OpenAI.") + parser.add_argument( + "--max-workers", + type=int, + default=10, + help="Maximum number of worker threads to use for concurrent processing.", + ) + args = parser.parse_args() + + max_workers = args.max_workers + + print(f"Starting concurrent processing for file: {INPUT_FILE} (Max workers: {max_workers})") + + try: + with open(INPUT_FILE, "r", encoding="utf-8") as infile: + lines = infile.readlines() + except FileNotFoundError: + print(f"Error: Input file not found '{INPUT_FILE}'") + return + + mem_client = MemOSAPI() + openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL) + + count = 0 + with open(OUTPUT_FILE, "w", encoding="utf-8") as outfile: + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [ + executor.submit(process_line, (i, line), mem_client, openai_client) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Processing concurrently...", + ) + for future in pbar: + try: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + count += 1 + except Exception as e: + print(f"A task failed to execute: {e}") + + print(f"\nProcessing complete! Successfully wrote {count} lines to {OUTPUT_FILE}.") + + +if __name__ == "__main__": + main() diff --git a/evaluation/scripts/PrefEval/prefeval_preprocess.py b/evaluation/scripts/PrefEval/prefeval_preprocess.py new file mode 100644 index 000000000..d0938fee5 --- /dev/null +++ b/evaluation/scripts/PrefEval/prefeval_preprocess.py @@ -0,0 +1,118 @@ +from datasets import load_dataset +import json +import os + + +def convert_dataset_to_jsonl(dataset_name, output_dir="./scripts/PrefEval"): + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + try: + dataset = load_dataset(dataset_name) + except Exception as e: + print(f"Error loading dataset: {e}") + return False + + for split_name, split_data in dataset.items(): + output_file_path = os.path.join(output_dir, f"{split_name}.jsonl") + try: + split_data.to_json(output_file_path, orient="records", lines=True) + print(f"Successfully saved the '{split_name}' split to {output_file_path}") + except Exception as e: + print(f"Error saving split '{split_name}' to JSONL: {e}") + return False + + return True + + +def restructure_conversation_in_json(data): + if "conversation" not in data: + return data + + conversation_dict = data["conversation"] + conversation_list = [] + + try: + sorted_turn_keys = sorted(conversation_dict.keys(), key=int) + except (ValueError, TypeError): + sorted_turn_keys = sorted(conversation_dict.keys()) + + for key in sorted_turn_keys: + turn_data = conversation_dict.get(key) + if ( + turn_data + and isinstance(turn_data, dict) + and "user" in turn_data + and "assistant" in turn_data + ): + user_text = turn_data["user"] + assistant_text = turn_data["assistant"] + + conversation_list.append({"role": "user", "content": user_text}) + conversation_list.append({"role": "assistant", "content": assistant_text}) + + result_data = data.copy() + if "conversation" in result_data: + del result_data["conversation"] + result_data["conversation"] = conversation_list + + return result_data + + +def process_jsonl_file(input_filepath, output_filepath): + try: + line_count = 0 + print(f"Start processing file: {input_filepath}") + with ( + open(input_filepath, "r", encoding="utf-8") as infile, + open(output_filepath, "w", encoding="utf-8") as outfile, + ): + for line in infile: + if not line.strip(): + continue + try: + original_data = json.loads(line) + processed_data = restructure_conversation_in_json(original_data) + outfile.write(json.dumps(processed_data, ensure_ascii=False) + "\n") + line_count += 1 + if line_count % 1000 == 0: + print(f"Processed {line_count} lines...") + except json.JSONDecodeError: + print(f"Warning: Skipping malformed line: {line.strip()}") + print(f"\nProcessing completed! Total processed lines: {line_count}.") + print(f"Result saved to: {output_filepath}") + return True + except FileNotFoundError: + print(f"Error: Input file not found: {input_filepath}") + return False + except Exception as e: + print(f"Unknown error occurred: {e}") + return False + + +def main(): + huggingface_dataset_name = "siyanzhao/prefeval_implicit_persona" + # output_directory = "./PrefEval" + output_directory = "./data/prefeval" + input_file_path = os.path.join(output_directory, "train.jsonl") + processed_file_path = os.path.join(output_directory, "pref_processed.jsonl") + + if convert_dataset_to_jsonl(huggingface_dataset_name, output_directory): + print("Dataset download and conversion completed!") + else: + print("Dataset download and conversion failed, please check error messages.") + return + + if not os.path.exists(input_file_path): + print(f"Error: Input file '{input_file_path}' does not exist.") + return + + if process_jsonl_file(input_file_path, processed_file_path): + print("Conversation format processing completed!") + else: + print("Conversation format processing failed, please check error messages.") + return + + +if __name__ == "__main__": + main() diff --git a/evaluation/scripts/locomo/locomo_eval.py b/evaluation/scripts/locomo/locomo_eval.py index 25d2a847e..2718cc24a 100644 --- a/evaluation/scripts/locomo/locomo_eval.py +++ b/evaluation/scripts/locomo/locomo_eval.py @@ -7,6 +7,7 @@ import nltk import numpy as np +import tiktoken import transformers from bert_score import score as bert_score @@ -23,7 +24,7 @@ logging.basicConfig(level=logging.CRITICAL) transformers.logging.set_verbosity_error() - +encoding = tiktoken.get_encoding("cl100k_base") # Download necessary NLTK resources try: nltk.download("wordnet", quiet=True) @@ -173,7 +174,7 @@ def calculate_nlp_metrics(gold_answer, response, context, options=None): gold_answer = str(gold_answer) if gold_answer is not None else "" response = str(response) if response is not None else "" - metrics = {"context_tokens": len(nltk.word_tokenize(context)) if context else 0} + metrics = {"context_tokens": len(encoding.encode(context)) if context else 0} if "lexical" in options: gold_tokens = nltk.word_tokenize(gold_answer.lower()) @@ -363,11 +364,12 @@ async def limited_task(task): "--lib", type=str, choices=["zep", "memos", "mem0", "mem0_graph", "openai", "memos-api", "memobase"], + default="memos-api", ) parser.add_argument( "--version", type=str, - default="default", + default="0917-test", help="Version identifier for loading results (e.g., 1010)", ) parser.add_argument( @@ -376,9 +378,9 @@ async def limited_task(task): default=3, help="Number of times to run the LLM grader for each question", ) - parser.add_argument("--options", nargs="+", default=["lexical", "semantic"]) + parser.add_argument("--options", nargs="+", default=[]) parser.add_argument( - "--workers", type=int, default=4, help="Number of concurrent workers for processing groups" + "--workers", type=int, default=10, help="Number of concurrent workers for processing groups" ) args = parser.parse_args() diff --git a/evaluation/scripts/locomo/locomo_ingestion.py b/evaluation/scripts/locomo/locomo_ingestion.py index ae5e57c87..06604c233 100644 --- a/evaluation/scripts/locomo/locomo_ingestion.py +++ b/evaluation/scripts/locomo/locomo_ingestion.py @@ -1,85 +1,31 @@ +import asyncio import os import sys -import uuid - -sys.path.insert( - 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) -) -sys.path.insert( - 0, - os.path.join( - os.path.dirname( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - ), - "evaluation", - "scripts", - ), +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) import argparse import concurrent.futures import json -import threading import time - from datetime import datetime, timezone - import pandas as pd - from dotenv import load_dotenv -from mem0 import MemoryClient -from memobase import ChatBlob -from tqdm import tqdm -from utils.client import memobase_client, memos_client -from zep_cloud.client import Zep - +from prompts import custom_instructions from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig from memos.mem_cube.general import GeneralMemCube from memos.mem_os.main import MOS -custom_instructions = """ -Generate personal memories that follow these guidelines: - -1. Each memory should be self-contained with complete context, including: - - The person's name, do not use "user" while creating memories - - Personal details (career aspirations, hobbies, life circumstances) - - Emotional states and reactions - - Ongoing journeys or future plans - - Specific dates when events occurred - -2. Include meaningful personal narratives focusing on: - - Identity and self-acceptance journeys - - Family planning and parenting - - Creative outlets and hobbies - - Mental health and self-care activities - - Career aspirations and education goals - - Important life events and milestones - -3. Make each memory rich with specific details rather than general statements - - Include timeframes (exact dates when possible) - - Name specific activities (e.g., "charity race for mental health" rather than just "exercise") - - Include emotional context and personal growth elements - -4. Extract memories only from user messages, not incorporating assistant responses - -5. Format each memory as a paragraph with a clear narrative structure that captures the person's experience, challenges, and aspirations -""" - - def get_client(frame: str, user_id: str | None = None, version: str = "default"): - if frame == "zep": - zep = Zep(api_key=os.getenv("ZEP_API_KEY"), base_url="https://api.getzep.com/api/v2") - return zep - - elif frame == "mem0" or frame == "mem0_graph": - mem0 = MemoryClient(api_key=os.getenv("MEM0_API_KEY")) - mem0.update_project(custom_instructions=custom_instructions) - return mem0 - - elif frame == "memos": + if frame == "memos": mos_config_path = "configs/mos_memos_config.json" with open(mos_config_path) as f: mos_config_data = json.load(f) @@ -110,37 +56,9 @@ def get_client(frame: str, user_id: str | None = None, version: str = "default") mem_cube_id=user_id, user_id=user_id, ) - return mos -def string_to_uuid(s: str, salt="memobase_client") -> str: - return str(uuid.uuid5(uuid.NAMESPACE_DNS, s + salt)) - - -def memobase_add_memory(user, message, retries=3): - for attempt in range(retries): - try: - _ = user.insert(ChatBlob(messages=message), sync=True) - return - except Exception as e: - if attempt < retries - 1: - time.sleep(1) - continue - else: - raise e - - -def memobase_add_memories_for_speaker(client, speaker, messages): - real_uid = string_to_uuid(speaker) - u = client.get_or_create_user(real_uid) - for i in range(0, len(messages), 2): - batch_messages = messages[i : i + 2] - memobase_add_memory(u, batch_messages) - print(f"[{i + 1}/{len(messages)}] Added messages for {speaker} successfully.") - u.flush(sync=True) - - def ingest_session(client, session, frame, version, metadata, revised_client=None): session_date = metadata["session_date"] date_format = "%I:%M %p on %d %B, %Y UTC" @@ -153,33 +71,11 @@ def ingest_session(client, session, frame, version, metadata, revised_client=Non print(f"Processing conv {conv_id}, session {metadata['session_key']}") start_time = time.time() - if frame == "zep": - for chat in tqdm(session, desc=f"{metadata['session_key']}"): - data = chat.get("speaker") + ": " + chat.get("text") - print({"context": data, "conv_id": conv_id, "created_at": iso_date}) - - # Check if the group exists, if not create it - groups = client.group.get_all_groups() - groups = dict(groups)["groups"] - exist_ids = [gp.group_id for gp in groups] - if conv_id not in exist_ids: - client.group.add(group_id=conv_id) - - # Add the message to the group - client.graph.add( - data=data, - type="message", - created_at=iso_date, - group_id=conv_id, - ) - - elif frame == "memos" or frame == "memos-api": + if frame == "memos" or frame == "memos-api": messages = [] messages_reverse = [] - - for chat in tqdm(session, desc=f"{metadata['session_key']}"): + for chat in session: data = chat.get("speaker") + ": " + chat.get("text") - if chat.get("speaker") == metadata["speaker_a"]: messages.append({"role": "user", "content": data, "chat_time": iso_date}) messages_reverse.append( @@ -193,22 +89,24 @@ def ingest_session(client, session, frame, version, metadata, revised_client=Non f"Unknown speaker {chat.get('speaker')} in session {metadata['session_key']}" ) - print({"context": data, "conv_id": conv_id, "created_at": iso_date}) - speaker_a_user_id = conv_id + "_speaker_a" speaker_b_user_id = conv_id + "_speaker_b" if frame == "memos-api": - client.add(messages=messages, user_id=f"{speaker_a_user_id.replace('_', '')}{version}") - - revised_client.add( - messages=messages_reverse, user_id=f"{speaker_b_user_id.replace('_', '')}{version}" + client.add( + messages=messages, + user_id=f"{speaker_a_user_id}_{version}", + conv_id=f"{conv_id}_{metadata['session_key']}", + ) + client.add( + messages=messages_reverse, + user_id=f"{speaker_b_user_id}_{version}", + conv_id=f"{conv_id}_{metadata['session_key']}", ) elif frame == "memos": client.add( messages=messages, user_id=speaker_a_user_id, ) - revised_client.add( messages=messages_reverse, user_id=speaker_b_user_id, @@ -216,13 +114,10 @@ def ingest_session(client, session, frame, version, metadata, revised_client=Non print(f"Added messages for {speaker_a_user_id} and {speaker_b_user_id} successfully.") elif frame == "mem0" or frame == "mem0_graph": - print(f"Processing abc for {metadata['session_key']}") messages = [] messages_reverse = [] - - for chat in tqdm(session, desc=f"{metadata['session_key']}"): + for chat in session: data = chat.get("speaker") + ": " + chat.get("text") - if chat.get("speaker") == metadata["speaker_a"]: messages.append({"role": "user", "content": data}) messages_reverse.append({"role": "assistant", "content": data}) @@ -234,8 +129,6 @@ def ingest_session(client, session, frame, version, metadata, revised_client=Non f"Unknown speaker {chat.get('speaker')} in session {metadata['session_key']}" ) - print({"context": data, "conv_id": conv_id, "created_at": iso_date}) - for i in range(0, len(messages), 2): batch_messages = messages[i : i + 2] batch_messages_reverse = messages_reverse[i : i + 2] @@ -272,13 +165,12 @@ def ingest_session(client, session, frame, version, metadata, revised_client=Non enable_graph=True, ) elif frame == "memobase": - print(f"Processing abc for {metadata['session_key']}") + from utils.memobase_utils import memobase_add_memory + messages = [] messages_reverse = [] - for chat in tqdm(session, desc=f"{metadata['session_key']}"): - data = chat.get("speaker") + ": " + chat.get("text") - + for chat in session: if chat.get("speaker") == metadata["speaker_a"]: messages.append( { @@ -318,30 +210,17 @@ def ingest_session(client, session, frame, version, metadata, revised_client=Non f"Unknown speaker {chat.get('speaker')} in session {metadata['session_key']}" ) - print({"context": data, "conv_id": conv_id, "created_at": iso_date}) - - thread_a = threading.Thread( - target=memobase_add_memories_for_speaker, - args=( - client, - metadata["speaker_a_user_id"], - messages, - ), - ) - - thread_b = threading.Thread( - target=memobase_add_memories_for_speaker, - args=( - client, - metadata["speaker_b_user_id"], - messages_reverse, - ), - ) - - thread_a.start() - thread_b.start() - thread_a.join() - thread_b.join() + users = client.get_all_users(limit=5000) + for u in users: + try: + if u["additional_fields"]["user_id"] == conv_id + "_speaker_a": + user_a = client.get_user(u["id"], no_get=True) + if u["additional_fields"]["user_id"] == conv_id + "_speaker_b": + user_b = client.get_user(u["id"], no_get=True) + except: + pass + memobase_add_memory(user_a, messages) + memobase_add_memory(user_b, messages_reverse) end_time = time.time() elapsed_time = round(end_time - start_time, 2) @@ -349,91 +228,86 @@ def ingest_session(client, session, frame, version, metadata, revised_client=Non return elapsed_time -def process_user(conv_idx, frame, locomo_df, version, num_workers=1): - try: - conversation = locomo_df["conversation"].iloc[conv_idx] - max_session_count = 35 - start_time = time.time() - total_session_time = 0 - valid_sessions = 0 - - revised_client = None - if frame == "zep": - client = get_client("zep") - elif frame == "mem0" or frame == "mem0_graph": - client = get_client(frame) - client.delete_all(user_id=f"locomo_exp_user_{conv_idx}") - client.delete_all(user_id=f"{conversation.get('speaker_a')}_{conv_idx}") - client.delete_all(user_id=f"{conversation.get('speaker_b')}_{conv_idx}") - elif frame == "memos": - conv_id = "locomo_exp_user_" + str(conv_idx) - speaker_a_user_id = conv_id + "_speaker_a" - speaker_b_user_id = conv_id + "_speaker_b" - client = get_client("memos", speaker_a_user_id, version) - revised_client = get_client("memos", speaker_b_user_id, version) - elif frame == "memos-api": - conv_id = "locomo_exp_user_" + str(conv_idx) - speaker_a_user_id = conv_id + "_speaker_a" - speaker_b_user_id = conv_id + "_speaker_b" - client = memos_client(mode="api") - revised_client = memos_client(mode="api") - elif frame == "memobase": - client = memobase_client() - conv_id = "locomo_exp_user_" + str(conv_idx) - speaker_a_user_id = conv_id + "_speaker_a" - speaker_b_user_id = conv_id + "_speaker_b" - client.delete_user(string_to_uuid(speaker_a_user_id)) - client.delete_user(string_to_uuid(speaker_b_user_id)) - sessions_to_process = [] - for session_idx in range(max_session_count): - session_key = f"session_{session_idx}" - session = conversation.get(session_key) - if session is None: - continue - - metadata = { - "session_date": conversation.get(f"session_{session_idx}_date_time") + " UTC", - "speaker_a": conversation.get("speaker_a"), - "speaker_b": conversation.get("speaker_b"), - "speaker_a_user_id": f"{conversation.get('speaker_a')}_{conv_idx}", - "speaker_b_user_id": f"{conversation.get('speaker_b')}_{conv_idx}", - "conv_idx": conv_idx, - "session_key": session_key, - } - sessions_to_process.append((session, metadata)) - valid_sessions += 1 - - print( - f"Processing {valid_sessions} sessions for user {conv_idx} with {num_workers} workers" - ) - with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = { - executor.submit( - ingest_session, client, session, frame, version, metadata, revised_client - ): metadata["session_key"] - for session, metadata in sessions_to_process - } +def process_user(conv_idx, frame, locomo_df, version): + conversation = locomo_df["conversation"].iloc[conv_idx] + max_session_count = 35 + start_time = time.time() + total_session_time = 0 + valid_sessions = 0 - for future in concurrent.futures.as_completed(futures): - session_key = futures[future] - try: - session_time = future.result() - total_session_time += session_time - print(f"User {conv_idx}, {session_key} processed in {session_time} seconds") - except Exception as e: - print(f"Error processing user {conv_idx}, session {session_key}: {e!s}") + revised_client = None + if frame == "mem0" or frame == "mem0_graph": + from mem0 import MemoryClient - end_time = time.time() - elapsed_time = round(end_time - start_time, 2) - print(f"User {conv_idx} processed successfully in {elapsed_time} seconds") + mem0 = MemoryClient(api_key=os.getenv("MEM0_API_KEY")) + mem0.update_project(custom_instructions=custom_instructions) + client.delete_all(user_id=f"locomo_exp_user_{conv_idx}") + client.delete_all(user_id=f"{conversation.get('speaker_a')}_{conv_idx}") + client.delete_all(user_id=f"{conversation.get('speaker_b')}_{conv_idx}") + elif frame == "memos": + conv_id = "locomo_exp_user_" + str(conv_idx) + speaker_a_user_id = conv_id + "_speaker_a" + speaker_b_user_id = conv_id + "_speaker_b" + client = get_client("memos", speaker_a_user_id, version) + revised_client = get_client("memos", speaker_b_user_id, version) + elif frame == "memos-api": + from utils.memos_api import MemOSAPI - return elapsed_time + client = MemOSAPI() + elif frame == "memobase": + from utils.client import memobase_client - except Exception as e: - return f"Error processing user {conv_idx}: {e!s}" + client = memobase_client() + conv_id = "locomo_exp_user_" + str(conv_idx) + speaker_a_user_id = conv_id + "_speaker_a" + speaker_b_user_id = conv_id + "_speaker_b" + all_users = client.get_all_users(limit=5000) + for user in all_users: + try: + if user["additional_fields"]["user_id"] in [speaker_a_user_id, speaker_b_user_id]: + client.delete_user(user["id"]) + print(f"๐Ÿ—‘๏ธ Deleted existing user from Memobase memory...") + except: + pass + memobase_user_id_a = client.add_user({"user_id": speaker_a_user_id}) + memobase_user_id_b = client.add_user({"user_id": speaker_b_user_id}) + user_id_a = memobase_user_id_a + user_id_b = memobase_user_id_b + + sessions_to_process = [] + for session_idx in range(max_session_count): + session_key = f"session_{session_idx}" + session = conversation.get(session_key) + if session is None: + continue + + metadata = { + "session_date": conversation.get(f"session_{session_idx}_date_time") + " UTC", + "speaker_a": conversation.get("speaker_a"), + "speaker_b": conversation.get("speaker_b"), + "speaker_a_user_id": f"{conversation.get('speaker_a')}_{conv_idx}", + "speaker_b_user_id": f"{conversation.get('speaker_b')}_{conv_idx}", + "conv_idx": conv_idx, + "session_key": session_key, + } + sessions_to_process.append((session, metadata)) + valid_sessions += 1 + + print(f"Processing {valid_sessions} sessions for user {conv_idx}") + + for session, metadata in sessions_to_process: + session_time = ingest_session(client, session, frame, version, metadata, revised_client) + total_session_time += session_time + print(f"User {conv_idx}, {metadata['session_key']} processed in {session_time} seconds") + end_time = time.time() + elapsed_time = round(end_time - start_time, 2) + print(f"User {conv_idx} processed successfully in {elapsed_time} seconds") + + return elapsed_time -def main(frame, version="default", num_workers=4): + +async def main(frame, version="default", num_workers=4): load_dotenv() locomo_df = pd.read_json("data/locomo/locomo10.json") @@ -445,24 +319,66 @@ def main(frame, version="default", num_workers=4): f"Starting processing for {num_users} users in serial mode, each user using {num_workers} workers for sessions..." ) - for user_id in range(num_users): - try: - result = process_user(user_id, frame, locomo_df, version, num_workers) - if isinstance(result, float): - total_time += result - else: - print(result) - except Exception as e: - print(f"Error processing user {user_id}: {e!s}") - - if num_users > 0: - average_time = total_time / num_users - minutes = int(average_time // 60) - seconds = int(average_time % 60) - average_time_formatted = f"{minutes} minutes and {seconds} seconds" - print( - f"The frame {frame} processed {num_users} users in average of {average_time_formatted} per user." - ) + if frame == "zep": + from zep_cloud.client import AsyncZep + + zep = AsyncZep(api_key=os.getenv("ZEP_API_KEY"), base_url="https://api.getzep.com/api/v2") + num_users = 10 + max_session_count = 35 + for group_idx in range(num_users): + conversation = locomo_df["conversation"].iloc[group_idx] + group_id = f"locomo_exp_user_{group_idx}" + print(group_id) + try: + await zep.group.add(group_id=group_id) + except Exception: + pass + + for session_idx in range(max_session_count): + session_key = f"session_{session_idx}" + print(session_key) + session = conversation.get(session_key) + if session is None: + continue + for msg in session: + session_date = conversation.get(f"session_{session_idx}_date_time") + " UTC" + date_format = "%I:%M %p on %d %B, %Y UTC" + date_string = datetime.strptime(session_date, date_format).replace( + tzinfo=timezone.utc + ) + iso_date = date_string.isoformat() + blip_caption = msg.get("blip_captions") + img_description = ( + f"(description of attached image: {blip_caption})" + if blip_caption is not None + else "" + ) + await zep.graph.add( + data=msg.get("speaker") + ": " + msg.get("text") + img_description, + type="message", + created_at=iso_date, + group_id=group_id, + ) + + else: + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [ + executor.submit(process_user, user_id, frame, locomo_df, version) + for user_id in range(num_users) + ] + + for future in concurrent.futures.as_completed(futures): + session_time = future.result() + total_time += session_time + + if num_users > 0: + average_time = total_time / num_users + minutes = int(average_time // 60) + seconds = int(average_time % 60) + average_time_formatted = f"{minutes} minutes and {seconds} seconds" + print( + f"The frame {frame} processed {num_users} users in average of {average_time_formatted} per user." + ) end_time = time.time() elapsed_time = round(end_time - start_time, 2) @@ -478,19 +394,20 @@ def main(frame, version="default", num_workers=4): "--lib", type=str, choices=["zep", "memos", "mem0", "mem0_graph", "memos-api", "memobase"], + default="memos-api", ) parser.add_argument( "--version", type=str, - default="default", + default="0917-test", help="Version identifier for saving results (e.g., 1010)", ) parser.add_argument( - "--workers", type=int, default=1, help="Number of parallel workers to process users" + "--workers", type=int, default=10, help="Number of parallel workers to process users" ) args = parser.parse_args() lib = args.lib version = args.version workers = args.workers - main(lib, version, workers) + asyncio.run(main(lib, version, workers)) diff --git a/evaluation/scripts/locomo/locomo_metric.py b/evaluation/scripts/locomo/locomo_metric.py index 8ee18faaf..de2fa48cc 100644 --- a/evaluation/scripts/locomo/locomo_metric.py +++ b/evaluation/scripts/locomo/locomo_metric.py @@ -10,11 +10,12 @@ "--lib", type=str, choices=["zep", "memos", "mem0", "mem0_graph", "openai", "memos-api", "memobase"], + default="memos-api", ) parser.add_argument( "--version", type=str, - default="default", + default="0917-test", help="Version identifier for loading results (e.g., 1010)", ) diff --git a/evaluation/scripts/locomo/locomo_responses.py b/evaluation/scripts/locomo/locomo_responses.py index 056b17163..eacf19171 100644 --- a/evaluation/scripts/locomo/locomo_responses.py +++ b/evaluation/scripts/locomo/locomo_responses.py @@ -24,7 +24,12 @@ async def locomo_response(frame, llm_client, context: str, question: str) -> str context=context, question=question, ) - elif frame == "memos": + elif frame == "memos" or frame == "memos-api": + prompt = ANSWER_PROMPT_MEMOS.format( + context=context, + question=question, + ) + elif frame == "memobase": prompt = ANSWER_PROMPT_MEMOS.format( context=context, question=question, @@ -112,7 +117,7 @@ async def main(frame, version="default"): os.makedirs("data", exist_ok=True) - print(all_responses) + # print(all_responses) with open(response_path, "w") as f: json.dump(all_responses, f, indent=2) @@ -125,11 +130,12 @@ async def main(frame, version="default"): "--lib", type=str, choices=["zep", "memos", "mem0", "mem0_graph", "openai", "memos-api", "memobase"], + default="memos-api", ) parser.add_argument( "--version", type=str, - default="default", + default="0917-test", help="Version identifier for loading results (e.g., 1010)", ) args = parser.parse_args() diff --git a/evaluation/scripts/locomo/locomo_search.py b/evaluation/scripts/locomo/locomo_search.py index e72f4594b..e81c339f2 100644 --- a/evaluation/scripts/locomo/locomo_search.py +++ b/evaluation/scripts/locomo/locomo_search.py @@ -1,21 +1,13 @@ import os import sys -import uuid - -sys.path.insert( - 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) -) -sys.path.insert( - 0, - os.path.join( - os.path.dirname( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - ), - "evaluation", - "scripts", - ), +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) import argparse import json @@ -23,26 +15,24 @@ from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from time import time - import pandas as pd - from dotenv import load_dotenv -from mem0 import MemoryClient from tqdm import tqdm -from utils.client import memobase_client, memos_client from utils.memos_filters import filter_memory_data -from zep_cloud.client import Zep - from memos.configs.mem_os import MOSConfig from memos.mem_os.main import MOS def get_client(frame: str, user_id: str | None = None, version: str = "default", top_k: int = 20): if frame == "zep": + from zep_cloud.client import Zep + zep = Zep(api_key=os.getenv("ZEP_API_KEY"), base_url="https://api.getzep.com/api/v2") return zep elif frame == "mem0" or frame == "mem0_graph": + from mem0 import MemoryClient + mem0 = MemoryClient(api_key=os.getenv("MEM0_API_KEY")) return mem0 @@ -182,7 +172,6 @@ def mem0_search(client, query, speaker_a_user_id, speaker_b_user_id, top_k=20): speaker_2_memories=json.dumps(search_speaker_b_memory, indent=4), ) - print(query, context) duration_ms = (time() - start) * 1000 return context, duration_ms @@ -214,30 +203,25 @@ def memos_search(client, query, conv_id, speaker_a, speaker_b, reversed_client=N speaker_2_memories=speaker_b_context, ) - print(query, context) duration_ms = (time() - start) * 1000 return context, duration_ms -def memos_api_search( - client, query, conv_id, speaker_a, speaker_b, top_k, version, reversed_client=None -): +def memos_api_search(client, query, conv_id, speaker_a, speaker_b, top_k, version): start = time() - speaker_a_user_id = conv_id + "_speaker_a" search_a_results = client.search( - query=query, user_id=f"{speaker_a_user_id.replace('_', '')}{version}", top_k=top_k + query=query, user_id=f"{conv_id}_speaker_a_{version}", top_k=top_k ) speaker_a_context = "" - for item in search_a_results: - speaker_a_context += f"{item}\n" + for item in search_a_results["memory_detail_list"]: + speaker_a_context += f"{item['memory_value']}\n" - speaker_b_user_id = conv_id + "_speaker_b" - search_b_results = reversed_client.search( - query=query, user_id=f"{speaker_b_user_id.replace('_', '')}{version}", top_k=top_k + search_b_results = client.search( + query=query, user_id=f"{conv_id}_speaker_b_{version}", top_k=top_k ) speaker_b_context = "" - for item in search_b_results: - speaker_b_context += f"{item}\n" + for item in search_b_results["memory_detail_list"]: + speaker_b_context += f"{item['memory_value']}\n" context = TEMPLATE_MEMOS.format( speaker_1=speaker_a, @@ -246,7 +230,6 @@ def memos_api_search( speaker_2_memories=speaker_b_context, ) - print(query, context) duration_ms = (time() - start) * 1000 return context, duration_ms @@ -323,7 +306,6 @@ def mem0_graph_search(client, query, speaker_a_user_id, speaker_b_user_id, top_k speaker_2_memories=json.dumps(search_speaker_b_memory, indent=4), speaker_2_graph_memories=json.dumps(search_speaker_b_graph, indent=4), ) - print(query, context) duration_ms = (time() - start) * 1000 return context, duration_ms @@ -360,11 +342,12 @@ def zep_search(client, query, group_id, top_k=20): def memobase_search( client, query, speaker_a, speaker_b, speaker_a_user_id, speaker_b_user_id, top_k=20 ): - start = time() - speaker_a_memories = memobase_search_memory( + from utils.memobase_utils import memobase_search_memory + + speaker_a_memories, t1 = memobase_search_memory( client, speaker_a_user_id, query, max_memory_context_size=top_k * 100 ) - speaker_b_memories = memobase_search_memory( + speaker_b_memories, t2 = memobase_search_memory( client, speaker_b_user_id, query, max_memory_context_size=top_k * 100 ) context = TEMPLATE_MEMOBASE.format( @@ -374,38 +357,8 @@ def memobase_search( speaker_2_user_id=speaker_b, speaker_2_memories=speaker_b_memories, ) - print(query, context) - duration_ms = (time() - start) * 1000 - return (context, duration_ms) - - -def string_to_uuid(s: str, salt="memobase_client") -> str: - return str(uuid.uuid5(uuid.NAMESPACE_DNS, s + salt)) - - -def memobase_search_memory( - client, user_id, query, max_memory_context_size, max_retries=3, retry_delay=1 -): - retries = 0 - real_uid = string_to_uuid(user_id) - u = client.get_user(real_uid, no_get=True) - - while retries < max_retries: - try: - memories = u.context( - max_token_size=max_memory_context_size, - chats=[{"role": "user", "content": query}], - event_similarity_threshold=0.2, - fill_window_with_events=True, - ) - return memories - except Exception as e: - print(f"Error during memory search: {e}") - print("Retrying...") - retries += 1 - if retries >= max_retries: - raise e - time.sleep(retry_delay) + duration_ms = t1 + t2 + return context, duration_ms def search_query(client, query, metadata, frame, version, reversed_client=None, top_k=20): @@ -417,6 +370,7 @@ def search_query(client, query, metadata, frame, version, reversed_client=None, if frame == "zep": context, duration_ms = zep_search(client, query, conv_id, top_k) + # sleep(0.1) elif frame == "mem0": context, duration_ms = mem0_search( client, query, speaker_a_user_id, speaker_b_user_id, top_k @@ -427,13 +381,15 @@ def search_query(client, query, metadata, frame, version, reversed_client=None, ) elif frame == "memos": context, duration_ms = memos_search( - client, query, conv_id, speaker_a, speaker_b, version, reversed_client + client, query, conv_id, speaker_a, speaker_b, reversed_client ) elif frame == "memos-api": context, duration_ms = memos_api_search( - client, query, conv_id, speaker_a, speaker_b, top_k, version, reversed_client + client, query, conv_id, speaker_a, speaker_b, top_k, version ) elif frame == "memobase": + speaker_a_user_id = conv_id + "_speaker_a" + speaker_b_user_id = conv_id + "_speaker_b" context, duration_ms = memobase_search( client, query, speaker_a, speaker_b, speaker_a_user_id, speaker_b_user_id, top_k ) @@ -484,13 +440,12 @@ def process_user(group_idx, locomo_df, frame, version, top_k=20, num_workers=1): client = get_client(frame, speaker_a_user_id, version, top_k=top_k) reversed_client = get_client(frame, speaker_b_user_id, version, top_k=top_k) elif frame == "memos-api": - speaker_a_user_id = conv_id + "_speaker_a" - speaker_b_user_id = conv_id + "_speaker_b" - client = memos_client(mode="api") - reversed_client = memos_client(mode="api") - client.user_register(user_id=f"{speaker_a_user_id.replace('_', '')}{version}") - reversed_client.user_register(user_id=f"{speaker_b_user_id.replace('_', '')}{version}") + from utils.memos_api import MemOSAPI + + client = MemOSAPI() elif frame == "memobase": + from utils.client import memobase_client + client = memobase_client() else: client = get_client(frame, conv_id, version) @@ -518,16 +473,6 @@ def process_qa(qa): ): result = future.result() if result: - context_preview = ( - result["context"][:20] + "..." if result["context"] else "No context" - ) - print( - { - "query": result["query"], - "context": context_preview, - "duration_ms": result["duration_ms"], - } - ) search_results[conv_id].append(result) os.makedirs(f"results/locomo/{frame}-{version}/tmp/", exist_ok=True) @@ -549,13 +494,13 @@ def main(frame, version="default", num_workers=1, top_k=20): all_search_results = defaultdict(list) for idx in range(num_users): - try: - print(f"Processing user {idx}...") - user_results = process_user(idx, locomo_df, frame, version, top_k, num_workers) - for conv_id, results in user_results.items(): - all_search_results[conv_id].extend(results) - except Exception as e: - print(f"User {idx} generated an exception: {e}") + # try: + print(f"Processing user {idx}...") + user_results = process_user(idx, locomo_df, frame, version, top_k, num_workers) + for conv_id, results in user_results.items(): + all_search_results[conv_id].extend(results) + # except Exception as e: + # print(f"User {idx} generated an exception: {e}") with open(f"results/locomo/{frame}-{version}/{frame}_locomo_search_results.json", "w") as f: json.dump(dict(all_search_results), f, indent=2) @@ -568,15 +513,16 @@ def main(frame, version="default", num_workers=1, top_k=20): "--lib", type=str, choices=["zep", "memos", "mem0", "mem0_graph", "memos-api", "memobase"], + default="mem0", ) parser.add_argument( "--version", type=str, - default="default", + default="0917-test", help="Version identifier for saving results (e.g., 1010)", ) parser.add_argument( - "--workers", type=int, default=1, help="Number of parallel workers to process users" + "--workers", type=int, default=10, help="Number of parallel workers to process users" ) parser.add_argument( "--top_k", type=int, default=20, help="Number of results to retrieve in search queries" diff --git a/evaluation/scripts/locomo/prompts.py b/evaluation/scripts/locomo/prompts.py index 9e080ec0f..02c6af1d8 100644 --- a/evaluation/scripts/locomo/prompts.py +++ b/evaluation/scripts/locomo/prompts.py @@ -38,26 +38,29 @@ ANSWER_PROMPT_ZEP = """ - You are an intelligent memory assistant tasked with retrieving accurate information from conversation memories. - # CONTEXT: - You have access to memories from a conversation. These memories contain - timestamped information that may be relevant to answering the question. + You have access to facts and entities from a conversation. # INSTRUCTIONS: 1. Carefully analyze all provided memories 2. Pay special attention to the timestamps to determine the answer 3. If the question asks about a specific event or fact, look for direct evidence in the memories 4. If the memories contain contradictory information, prioritize the most recent memory - 5. If there is a question about time references (like "last year", "two months ago", etc.), - calculate the actual date based on the memory timestamp. For example, if a memory from - 4 May 2022 mentions "went to India last year," then the trip occurred in 2021. - 6. Always convert relative time references to specific dates, months, or years. For example, - convert "last year" to "2022" or "two months ago" to "March 2023" based on the memory - timestamp. Ignore the reference while answering the question. - 7. Focus only on the content of the memories. Do not confuse character - names mentioned in memories with the actual users who created those memories. - 8. The answer should be less than 5-6 words. + 5. Always convert relative time references to specific dates, months, or years. + 6. Be as specific as possible when talking about people, places, and events + 7. Timestamps in memories represent the actual time the event occurred, not the time the event was mentioned in a message. + + Clarification: + When interpreting memories, use the timestamp to determine when the described event happened, not when someone talked about the event. + + Example: + + Memory: (2023-03-15T16:33:00Z) I went to the vet yesterday. + Question: What day did I go to the vet? + Correct Answer: March 15, 2023 + Explanation: + Even though the phrase says "yesterday," the timestamp shows the event was recorded as happening on March 15th. Therefore, the actual vet visit happened on that date, regardless of the word "yesterday" in the text. + # APPROACH (Think step by step): 1. First, examine all memories that contain information related to the question @@ -73,8 +76,7 @@ {context} Question: {question} - Answer: - """ + Answer:""" ANSWER_PROMPT_MEMOS = """ You are a knowledgeable and helpful AI assistant. @@ -108,3 +110,31 @@ Answer: """ + +custom_instructions = """ +Generate personal memories that follow these guidelines: + +1. Each memory should be self-contained with complete context, including: + - The person's name, do not use "user" while creating memories + - Personal details (career aspirations, hobbies, life circumstances) + - Emotional states and reactions + - Ongoing journeys or future plans + - Specific dates when events occurred + +2. Include meaningful personal narratives focusing on: + - Identity and self-acceptance journeys + - Family planning and parenting + - Creative outlets and hobbies + - Mental health and self-care activities + - Career aspirations and education goals + - Important life events and milestones + +3. Make each memory rich with specific details rather than general statements + - Include timeframes (exact dates when possible) + - Name specific activities (e.g., "charity race for mental health" rather than just "exercise") + - Include emotional context and personal growth elements + +4. Extract memories only from user messages, not incorporating assistant responses + +5. Format each memory as a paragraph with a clear narrative structure that captures the person's experience, challenges, and aspirations +""" diff --git a/evaluation/scripts/longmemeval/lme_eval.py b/evaluation/scripts/longmemeval/lme_eval.py index 384f595be..329194e36 100644 --- a/evaluation/scripts/longmemeval/lme_eval.py +++ b/evaluation/scripts/longmemeval/lme_eval.py @@ -8,6 +8,7 @@ import nltk import numpy as np +import tiktoken import transformers from bert_score import score as bert_score @@ -25,7 +26,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.prompts import LME_JUDGE_MODEL_TEMPLATE - +encoding = tiktoken.get_encoding("cl100k_base") logging.basicConfig(level=logging.CRITICAL) transformers.logging.set_verbosity_error() @@ -133,7 +134,7 @@ def calculate_nlp_metrics(golden_answer, response, context, options=None): response = str(response) if response is not None else "" context = str(context) if context is not None else "" - metrics = {"context_tokens": len(nltk.word_tokenize(context)) if context else 0} + metrics = {"context_tokens": len(encoding.encode(context)) if context else 0} if "lexical" in options: gold_tokens = nltk.word_tokenize(golden_answer.lower()) @@ -199,7 +200,7 @@ async def process_qa( ) print("\n" + "=" * 80) - print(f"๐Ÿ” Processed User: \033[1m{user_id}\033[0m") + print(f"๐Ÿ” Processed User: {user_id}") print("-" * 80) print(f"โ“ Question: \n {question}") print("-" * 80) @@ -218,7 +219,7 @@ async def process_qa( judgments_formatted = [] for run, correct in judgments_dict.items(): - status = "\033[92mโœ“ CORRECT\033[0m" if correct else "\033[91mโœ— WRONG\033[0m" + status = "โœ“ CORRECT" if correct else "โœ— WRONG" judgments_formatted.append(f"{run}: {status}") print(f"โš–๏ธ Judgments: \n {', '.join(judgments_formatted)}") @@ -309,25 +310,21 @@ async def main(frame, version, nlp_options, num_runs=3, num_workers=5): run_scores, evaluated_count = evaluate_accuracy(lme_eval_results, num_runs) print("\n" + "=" * 80) - print("\033[1;36m๐Ÿ“Š EVALUATION SUMMARY\033[0m".center(80)) + print("๐Ÿ“Š EVALUATION SUMMARY".center(80)) print("=" * 80) if evaluated_count > 0: - print( - f"๐Ÿ“‹ \033[1mEvaluated:\033[0m \033[93m{evaluated_count}\033[0m responses across \033[93m{num_runs}\033[0m runs" - ) - print( - f"๐ŸŽฏ \033[1mLLM-as-a-Judge Mean Accuracy:\033[0m \033[92m{np.mean(run_scores):.4f}\033[0m" - ) - print(f"๐Ÿ” \033[1mStandard Deviation:\033[0m \033[93m{np.std(run_scores):.4f}\033[0m") + print(f"๐Ÿ“‹ Evaluated: {evaluated_count} responses across {num_runs} runs") + print(f"๐ŸŽฏ LLM-as-a-Judge Mean Accuracy: {np.mean(run_scores):.4f}") + print(f"๐Ÿ” Standard Deviation: {np.std(run_scores):.4f}") - run_scores_formatted = [f"\033[94m{round(s, 4):.4f}\033[0m" for s in run_scores] - print(f"๐Ÿ”ข \033[1mIndividual run scores:\033[0m [{', '.join(run_scores_formatted)}]") + run_scores_formatted = [f"{round(s, 4):.4f}" for s in run_scores] + print(f"๐Ÿ”ข Individual run scores: [{', '.join(run_scores_formatted)}]") else: - print("\033[91mโš ๏ธ No responses were evaluated. LLM-as-a-Judge score: N/A (0/0)\033[0m") + print("โš ๏ธ No responses were evaluated. LLM-as-a-Judge score: N/A (0/0)") if error_count > 0: - print(f"\033[91mโš ๏ธ Encountered {error_count} errors during processing\033[0m") + print(f"โš ๏ธ Encountered {error_count} errors during processing") print("-" * 80) @@ -336,8 +333,8 @@ async def main(frame, version, nlp_options, num_runs=3, num_workers=5): with open(judged_path, "w") as file: json.dump(lme_eval_results, file, indent=4) - print("\033[92mโœ… Evaluation completed successfully!\033[0m") - print(f"๐Ÿ“ Results saved to: \033[1;94m{judged_path}\033[0m") + print("โœ… Evaluation completed successfully!") + print(f"๐Ÿ“ Results saved to: {judged_path}") print("=" * 80 + "\n") @@ -347,23 +344,24 @@ async def main(frame, version, nlp_options, num_runs=3, num_workers=5): "--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "zep", "memos-api", "zep", "memobase"], + default="memos-api", ) parser.add_argument( - "--version", type=str, default="v1", help="Version of the evaluation framework." + "--version", type=str, default="0923", help="Version of the evaluation framework." ) parser.add_argument( "--options", type=str, nargs="+", - default=["lexical", "semantic"], + default=["lexical"], choices=["lexical", "semantic"], help="NLP options to use for evaluation.", ) parser.add_argument( - "--num_runs", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." + "--num_runs", type=int, default=1, help="Number of runs for LLM-as-a-Judge evaluation." ) parser.add_argument( - "--workers", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." + "--workers", type=int, default=30, help="Number of runs for LLM-as-a-Judge evaluation." ) args = parser.parse_args() diff --git a/evaluation/scripts/longmemeval/lme_ingestion.py b/evaluation/scripts/longmemeval/lme_ingestion.py index f2df0bd30..ebc9b3d7e 100644 --- a/evaluation/scripts/longmemeval/lme_ingestion.py +++ b/evaluation/scripts/longmemeval/lme_ingestion.py @@ -2,7 +2,6 @@ import os import sys - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone @@ -10,9 +9,6 @@ import pandas as pd from tqdm import tqdm -from utils.client import mem0_client, memobase_client, memos_client, zep_client -from utils.memobase_utils import memobase_add_memory, string_to_uuid -from zep_cloud.types import Message def ingest_session(session, date, user_id, session_id, frame, client): @@ -20,7 +16,7 @@ def ingest_session(session, date, user_id, session_id, frame, client): if frame == "zep": for idx, msg in enumerate(session): print( - f"\033[90m[{frame}]\033[0m ๐Ÿ“ User \033[1;94m{user_id}\033[0m ๐Ÿ’ฌ Session \033[1;94m{session_id}\033[0m: [\033[93m{idx + 1}/{len(session)}\033[0m] Ingesting message: \033[1m{msg['role']}\033[0m - \033[96m{msg['content'][:50]}...\033[0m at \033[92m{date.isoformat()}\033[0m" + f"[{frame}] ๐Ÿ“ User {user_id} ๐Ÿ’ฌ Session {session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}... at {date.isoformat()}" ) client.memory.add( session_id=session_id, @@ -37,7 +33,7 @@ def ingest_session(session, date, user_id, session_id, frame, client): for idx, msg in enumerate(session): messages.append({"role": msg["role"], "content": msg["content"][:8000]}) print( - f"\033[90m[{frame}]\033[0m ๐Ÿ“ Session \033[1;94m{session_id}\033[0m: [\033[93m{idx + 1}/{len(session)}\033[0m] Reading message: \033[1m{msg['role']}\033[0m - \033[96m{msg['content'][:50]}...\033[0m at \033[92m{date.isoformat()}\033[0m" + f"[{frame}] ๐Ÿ“ Session {session_id}: [{idx + 1}/{len(session)}] Reading message: {msg['role']} - {msg['content'][:50]}... at {date.isoformat()}" ) if frame == "mem0-local": client.add( @@ -52,7 +48,7 @@ def ingest_session(session, date, user_id, session_id, frame, client): version="v2", ) print( - f"\033[90m[{frame}]\033[0m โœ… Session \033[1;94m{session_id}\033[0m: Ingested \033[93m{len(messages)}\033[0m messages at \033[92m{date.isoformat()}\033[0m" + f"[{frame}] โœ… Session {session_id}: Ingested {len(messages)} messages at {date.isoformat()}" ) elif frame == "memobase": for idx, msg in enumerate(session): @@ -63,17 +59,11 @@ def ingest_session(session, date, user_id, session_id, frame, client): "created_at": date.isoformat(), } ) - print( - f"\033[90m[{frame}]\033[0m ๐Ÿ“ User \033[1;94m{user_id}\033[0m ๐Ÿ’ฌ Session \033[1;94m{session_id}\033[0m: [\033[93m{idx + 1}/{len(session)}\033[0m] Ingesting message: \033[1m{msg['role']}\033[0m - \033[96m{msg['content'][:50]}...\033[0m at \033[92m{date.isoformat()}\033[0m" - ) + # print(f"[{frame}] ๐Ÿ“ User {user_id} ๐Ÿ’ฌ Session {session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}... at {date.isoformat()}") - real_uid = string_to_uuid(user_id) - user = client.get_user(real_uid) + user = client.get_user(user_id) memobase_add_memory(user, messages) - user.flash(sync=True) - print( - f"\033[90m[{frame}]\033[0m โœ… Session \033[1;94m{session_id}\033[0m: Ingested \033[93m{len(messages)}\033[0m messages at \033[92m{date.isoformat()}\033[0m" - ) + # print(f"[{frame}] โœ… Session {session_id}: Ingested {len(messages)} messages at {date.isoformat()}") elif frame == "memos-local" or frame == "memos-api": for _idx, msg in enumerate(session): messages.append( @@ -83,47 +73,42 @@ def ingest_session(session, date, user_id, session_id, frame, client): "chat_time": date.isoformat(), } ) - client.add(messages=messages, user_id=user_id) + if frame == "memos-local": + client.add(messages=messages, user_id=user_id) + client.mem_reorganizer_wait() + elif frame == "memos-api": + if messages: + client.add(messages=messages, user_id=user_id, conv_id=session_id) print( - f"\033[90m[{frame}]\033[0m โœ… Session \033[1;94m{session_id}\033[0m: Ingested \033[93m{len(messages)}\033[0m messages at \033[92m{date.isoformat()}\033[0m" + f"[{frame}] โœ… Session {session_id}: Ingested {len(messages)} messages at {date.isoformat()}" ) - client.mem_reorganizer_wait() -def ingest_conv(lme_df, version, conv_idx, frame): +def ingest_conv(lme_df, version, conv_idx, frame, success_records, f): conversation = lme_df.iloc[conv_idx] sessions = conversation["haystack_sessions"] dates = conversation["haystack_dates"] - user_id = "lme_exper_user_" + str(conv_idx) + user_id = f"lme_exper_user_{version}_{conv_idx}" print("\n" + "=" * 80) - print(f"๐Ÿ”„ \033[1;36mINGESTING CONVERSATION {conv_idx}\033[0m".center(80)) + print(f"๐Ÿ”„ [INGESTING CONVERSATION {conv_idx}".center(80)) print("=" * 80) if frame == "zep": + from utils.client import zep_client + client = zep_client() - print("๐Ÿ”Œ \033[1mUsing \033[94mZep client\033[0m \033[1mfor ingestion...\033[0m") - # Delete existing user and session if they exist - client.user.delete(user_id) - print(f"๐Ÿ—‘๏ธ Deleted existing user \033[93m{user_id}\033[0m from Zep memory...") - # Add user to Zep memory client.user.add(user_id=user_id) - print(f"โž• Added user \033[93m{user_id}\033[0m to Zep memory...") - elif frame == "mem0-local": - client = mem0_client(mode="local") - print("๐Ÿ”Œ \033[1mUsing \033[94mMem0 Local client\033[0m \033[1mfor ingestion...\033[0m") - # Delete existing memories for the user - client.delete_all(user_id=user_id) - print(f"๐Ÿ—‘๏ธ Deleted existing memories for user \033[93m{user_id}\033[0m...") + print(f"โž• Added user {user_id} to Zep memory...") elif frame == "mem0-api": + from utils.client import mem0_client + client = mem0_client(mode="api") - print("๐Ÿ”Œ \033[1mUsing \033[94mMem0 API client\033[0m \033[1mfor ingestion...\033[0m") - # Delete existing memories for the user - client.delete_all(user_id=user_id) - print(f"๐Ÿ—‘๏ธ Deleted existing memories for user \033[93m{user_id}\033[0m...") elif frame == "memos-local": + from utils.client import memos_client + client = memos_client( mode="local", db_name=f"lme_{frame}-{version}", @@ -134,64 +119,73 @@ def ingest_conv(lme_df, version, conv_idx, frame): mem_os_config_path="configs/mos_memos_config.json", addorsearch="add", ) - print("๐Ÿ”Œ \033[1mUsing \033[94mMemos Local client\033[0m \033[1mfor ingestion...\033[0m") + print("๐Ÿ”Œ Using Memos Local client for ingestion...") elif frame == "memos-api": - client = memos_client(mode="api") + from utils.memos_api import MemOSAPI + + client = MemOSAPI() elif frame == "memobase": + from utils.client import memobase_client + client = memobase_client() - print("๐Ÿ”Œ \033[1mUsing \033[94mMemobase client\033[0m \033[1mfor ingestion...\033[0m") - client.delete_user(string_to_uuid(user_id)) - print(f"๐Ÿ—‘๏ธ Deleted existing user \033[93m{user_id}\033[0m from Memobase memory...") + memobase_user_id = client.add_user({"user_id": user_id}) + user_id = memobase_user_id for idx, session in enumerate(sessions): - session_id = user_id + "_lme_exper_session_" + str(idx) - if frame == "zep": - client.memory.add_session( - user_id=user_id, - session_id=session_id, - ) - print( - f"โž• Added session \033[93m{session_id}\033[0m for user \033[93m{user_id}\033[0m to Zep memory..." - ) - - if len(session) == 0: - print(f"\033[93mโš ๏ธ Skipping empty session {idx} in conversation {conv_idx}\033[0m") - continue + if f"{conv_idx}_{idx}" not in success_records: + session_id = user_id + "_lme_exper_session_" + str(idx) + if frame == "zep": + client.memory.add_session( + user_id=user_id, + session_id=session_id, + ) + date = dates[idx] + " UTC" + date_format = "%Y/%m/%d (%a) %H:%M UTC" + date_string = datetime.strptime(date, date_format).replace(tzinfo=timezone.utc) - date = dates[idx] + " UTC" - date_format = "%Y/%m/%d (%a) %H:%M UTC" - date_string = datetime.strptime(date, date_format).replace(tzinfo=timezone.utc) - - try: - ingest_session(session, date_string, user_id, session_id, frame, client) - except Exception as e: - print(f"\033[91mโŒ Error ingesting session: {e}\033[0m") + try: + ingest_session(session, date_string, user_id, session_id, frame, client) + f.write(f"{conv_idx}_{idx}\n") + f.flush() + except Exception as e: + print(f"โŒ Error ingesting session: {e}") + else: + print(f"โœ… Session {conv_idx}_{idx} already ingested") if frame == "memos-local": client.mem_reorganizer_off() + print("=" * 80) def main(frame, version, num_workers=2): print("\n" + "=" * 80) - print(f"๐Ÿš€ \033[1;36mLONGMEMEVAL INGESTION - {frame.upper()} v{version}\033[0m".center(80)) + print(f"๐Ÿš€ LONGMEMEVAL INGESTION - {frame.upper()} v{version}".center(80)) print("=" * 80) lme_df = pd.read_json("data/longmemeval/longmemeval_s.json") - print( - "๐Ÿ“š \033[1mLoaded LongMemeval dataset\033[0m from \033[94mdata/longmemeval/longmemeval_s.json\033[0m" - ) + print("๐Ÿ“š Loaded LongMemeval dataset from data/longmemeval/longmemeval_s.json") num_multi_sessions = len(lme_df) - print(f"๐Ÿ‘ฅ Number of users: \033[93m{num_multi_sessions}\033[0m") + print(f"๐Ÿ‘ฅ Number of users: {num_multi_sessions}") print("-" * 80) start_time = datetime.now() + os.makedirs(f"results/lme/{frame}-{version}/", exist_ok=True) + success_records = [] + record_file = f"results/lme/{frame}-{version}/success_records.txt" + if os.path.exists(record_file): + for i in open(record_file, "r").readlines(): + success_records.append(i.strip()) + + f = open(record_file, "a+") with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [] for session_idx in range(num_multi_sessions): - future = executor.submit(ingest_conv, lme_df, version, session_idx, frame) + future = executor.submit( + ingest_conv, lme_df, version, session_idx, frame, success_records, f + ) futures.append(future) for future in tqdm( @@ -200,21 +194,17 @@ def main(frame, version, num_workers=2): try: future.result() except Exception as e: - print(f"\033[91mโŒ Error processing conversation: {e}\033[0m") + print(f"โŒ Error processing conversation: {e}") end_time = datetime.now() elapsed_time = end_time - start_time elapsed_time_str = str(elapsed_time).split(".")[0] print("\n" + "=" * 80) - print("โœ… \033[1;32mINGESTION COMPLETE\033[0m".center(80)) + print("โœ… INGESTION COMPLETE".center(80)) print("=" * 80) - print( - f"โฑ๏ธ Total time taken to ingest \033[93m{num_multi_sessions}\033[0m multi-sessions: \033[92m{elapsed_time_str}\033[0m" - ) - print( - f"๐Ÿ”„ Framework: \033[94m{frame}\033[0m | Version: \033[94m{version}\033[0m | Workers: \033[94m{num_workers}\033[0m" - ) + print(f"โฑ๏ธ Total time taken to ingest {num_multi_sessions} multi-sessions: {elapsed_time_str}") + print(f"๐Ÿ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}") print("=" * 80 + "\n") @@ -224,12 +214,13 @@ def main(frame, version, num_workers=2): "--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], + default="memos-api", ) parser.add_argument( - "--version", type=str, default="v1", help="Version of the evaluation framework." + "--version", type=str, default="0924", help="Version of the evaluation framework." ) parser.add_argument( - "--workers", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." + "--workers", type=int, default=20, help="Number of runs for LLM-as-a-Judge evaluation." ) args = parser.parse_args() diff --git a/evaluation/scripts/longmemeval/lme_metric.py b/evaluation/scripts/longmemeval/lme_metric.py index 69f7748e0..c9e25ae86 100644 --- a/evaluation/scripts/longmemeval/lme_metric.py +++ b/evaluation/scripts/longmemeval/lme_metric.py @@ -259,9 +259,10 @@ def calculate_scores(data, grade_path, output_path): "--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], + default="memos-api", ) parser.add_argument( - "--version", type=str, default="v1", help="Version of the evaluation framework." + "--version", type=str, default="0923", help="Version of the evaluation framework." ) args = parser.parse_args() lib, version = args.lib, args.version diff --git a/evaluation/scripts/longmemeval/lme_responses.py b/evaluation/scripts/longmemeval/lme_responses.py index e1e341826..da61f23fc 100644 --- a/evaluation/scripts/longmemeval/lme_responses.py +++ b/evaluation/scripts/longmemeval/lme_responses.py @@ -45,14 +45,10 @@ def process_qa(user_id, search_result, llm_client): response_duration_ms = (time() - start) * 1000 print("\n" + "-" * 80) - print(f"๐Ÿค– Processed User: \033[1m{user_id}\033[0m") - print(f"โฑ๏ธ Duration: \033[92m{response_duration_ms:.2f} ms\033[0m") - print(f"โ“ Question: \033[93m{question}\033[0m") - print( - f"๐Ÿ’ฌ Answer: \033[96m{anwer[:150]}...\033[0m" - if len(anwer) > 150 - else f"๐Ÿ’ฌ Answer: \033[96m{anwer}\033[0m" - ) + print(f"๐Ÿค– Processed User: {user_id}") + print(f"โฑ๏ธ Duration: {response_duration_ms:.2f} ms") + print(f"โ“ Question: {question}") + print(f"๐Ÿ’ฌ Answer: {anwer[:150]}..." if len(anwer) > 150 else f"๐Ÿ’ฌ Answer: {anwer}") print("-" * 80) return { @@ -71,11 +67,7 @@ def process_qa(user_id, search_result, llm_client): def main(frame, version, num_workers=4): print("\n" + "=" * 80) - print( - f"๐Ÿš€ \033[1;36mLONGMEMEVAL RESPONSE GENERATION - {frame.upper()} v{version}\033[0m".center( - 80 - ) - ) + print(f"๐Ÿš€ LONGMEMEVAL RESPONSE GENERATION - {frame.upper()} v{version}".center(80)) print("=" * 80) load_dotenv() @@ -84,18 +76,16 @@ def main(frame, version, num_workers=4): api_key=os.getenv("CHAT_MODEL_API_KEY"), base_url=os.getenv("CHAT_MODEL_BASE_URL") ) - print( - f"๐Ÿ”Œ \033[1mUsing OpenAI client with model:\033[0m \033[94m{os.getenv('CHAT_MODEL')}\033[0m" - ) + print(f"๐Ÿ”Œ Using OpenAI client with model: {os.getenv('CHAT_MODEL')}") search_path = f"results/lme/{frame}-{version}/{frame}_lme_search_results.json" response_path = f"results/lme/{frame}-{version}/{frame}_lme_responses.json" - print(f"๐Ÿ“‚ \033[1mLoading search results from:\033[0m \033[94m{search_path}\033[0m") + print(f"๐Ÿ“‚ Loading search results from: {search_path}") with open(search_path) as file: lme_search_results = json.load(file) - print(f"๐Ÿ“Š \033[1mFound\033[0m \033[93m{len(lme_search_results)}\033[0m users to process") - print(f"โš™๏ธ \033[1mUsing\033[0m \033[93m{num_workers}\033[0m worker threads") + print(f"๐Ÿ“Š Found {len(lme_search_results)} users to process") + print(f"โš™๏ธ Using {num_workers} worker threads") print("-" * 80) lme_responses = {} @@ -118,25 +108,23 @@ def main(frame, version, num_workers=4): result = future.result() lme_responses[user_id] = result except Exception as exc: - print(f"\033[91mโŒ Error processing user {user_id}: {exc}\033[0m") + print(f"โŒ Error processing user {user_id}: {exc}") end_time = time() elapsed_time = end_time - start_time elapsed_sec = int(elapsed_time) print("\n" + "=" * 80) - print("โœ… \033[1;32mRESPONSE GENERATION COMPLETE\033[0m".center(80)) + print("โœ… RESPONSE GENERATION COMPLETE".center(80)) print("=" * 80) - print(f"โฑ๏ธ \033[1mTotal time:\033[0m \033[92m{elapsed_sec // 60}m {elapsed_sec % 60}s\033[0m") - print(f"๐Ÿ“Š \033[1mProcessed:\033[0m \033[93m{len(lme_responses)}\033[0m users") - print( - f"๐Ÿ”„ \033[1mFramework:\033[0m \033[94m{frame}\033[0m | \033[1mVersion:\033[0m \033[94m{version}\033[0m" - ) + print(f"โฑ๏ธ Total time: {elapsed_sec // 60}m {elapsed_sec % 60}s") + print(f"๐Ÿ“Š Processed: {len(lme_responses)} users") + print(f"๐Ÿ”„ Framework: {frame} | Version: {version}") with open(response_path, "w") as f: json.dump(lme_responses, f, indent=4) - print(f"๐Ÿ“ \033[1mResponses saved to:\033[0m \033[1;94m{response_path}\033[0m") + print(f"๐Ÿ“ Responses saved to: {response_path}") print("=" * 80 + "\n") @@ -146,12 +134,13 @@ def main(frame, version, num_workers=4): "--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], + default="memos-api", ) parser.add_argument( - "--version", type=str, default="v1", help="Version of the evaluation framework." + "--version", type=str, default="0923", help="Version of the evaluation framework." ) parser.add_argument( - "--workers", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." + "--workers", type=int, default=30, help="Number of runs for LLM-as-a-Judge evaluation." ) args = parser.parse_args() diff --git a/evaluation/scripts/longmemeval/lme_search.py b/evaluation/scripts/longmemeval/lme_search.py index 898ab7e27..262da3312 100644 --- a/evaluation/scripts/longmemeval/lme_search.py +++ b/evaluation/scripts/longmemeval/lme_search.py @@ -16,6 +16,7 @@ from utils.client import mem0_client, memobase_client, memos_client, zep_client from utils.memobase_utils import memobase_search_memory from utils.memos_filters import filter_memory_data +from utils.memos_api import MemOSAPI from utils.prompts import ( MEM0_CONTEXT_TEMPLATE, MEM0_GRAPH_CONTEXT_TEMPLATE, @@ -126,24 +127,15 @@ def memos_search(client, user_id, query, top_k, frame="memos-local"): elif frame == "memos-api": results = client.search(query=query, user_id=user_id, top_k=top_k) - search_memories = "\n".join([f" - {item}" for item in results]) + search_memories = "\n".join( + [f" - {item['memory_value']}" for item in results["memory_detail_list"]] + ) context = MEMOS_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) duration_ms = (time() - start) * 1000 return context, duration_ms -def memobase_search(client, user_id, query, top_k=20): - start = time() - memories = memobase_search_memory(client, user_id, query, max_memory_context_size=top_k * 100) - context = MEMOBASE_CONTEXT_TEMPLATE.format( - user_id=user_id, - memories=memories, - ) - duration_ms = (time() - start) * 1000 - return context, duration_ms - - def process_user(lme_df, conv_idx, frame, version, top_k=20): row = lme_df.iloc[conv_idx] question = row["question"] @@ -153,7 +145,7 @@ def process_user(lme_df, conv_idx, frame, version, top_k=20): answer = row["answer"] answer_session_ids = set(row["answer_session_ids"]) haystack_session_ids = row["haystack_session_ids"] - user_id = f"lme_exper_user_{conv_idx!s}" + user_id = f"lme_exper_user_{version}_{conv_idx}" id_to_session = dict(zip(haystack_session_ids, sessions, strict=False)) answer_sessions = [id_to_session[sid] for sid in answer_session_ids if sid in id_to_session] answer_evidences = [] @@ -166,29 +158,29 @@ def process_user(lme_df, conv_idx, frame, version, top_k=20): search_results = defaultdict(list) print("\n" + "-" * 80) - print(f"๐Ÿ”Ž \033[1;36m[{conv_idx + 1}/{len(lme_df)}] Processing conversation {conv_idx}\033[0m") - print(f"โ“ Question: \033[93m{question}\033[0m") - print(f"๐Ÿ“… Date: \033[92m{question_date}\033[0m") - print(f"๐Ÿท๏ธ Type: \033[94m{question_type}\033[0m") + print(f"๐Ÿ”Ž [{conv_idx + 1}/{len(lme_df)}] Processing conversation {conv_idx}") + print(f"โ“ Question: {question}") + print(f"๐Ÿ“… Date: {question_date}") + print(f"๐Ÿท๏ธ Type: {question_type}") print("-" * 80) existing_results, exists = load_existing_results(frame, version, conv_idx) if exists: - print(f"โ™ป๏ธ \033[93mUsing existing results for conversation {conv_idx}\033[0m") + print(f"โ™ป๏ธ Using existing results for conversation {conv_idx}") return existing_results if frame == "zep": client = zep_client() - print("๐Ÿ”Œ \033[1mUsing \033[94mZep client\033[0m \033[1mfor search...\033[0m") + print("๐Ÿ”Œ Using Zep client for search...") context, duration_ms = zep_search(client, user_id, question) elif frame == "mem0-local": client = mem0_client(mode="local") - print("๐Ÿ”Œ \033[1mUsing \033[94mMem0 Local client\033[0m \033[1mfor search...\033[0m") + print("๐Ÿ”Œ Using Mem0 Local client for search...") context, duration_ms = mem0_search(client, user_id, question, top_k=top_k, frame=frame) elif frame == "mem0-api": client = mem0_client(mode="api") - print("๐Ÿ”Œ \033[1mUsing \033[94mMem0 API client\033[0m \033[1mfor search...\033[0m") + print("๐Ÿ”Œ Using Mem0 API client for search...") context, duration_ms = mem0_search(client, user_id, question, top_k=top_k, frame=frame) elif frame == "memos-local": client = memos_client( @@ -201,18 +193,18 @@ def process_user(lme_df, conv_idx, frame, version, top_k=20): mem_os_config_path="configs/mos_memos_config.json", addorsearch="search", ) - print("๐Ÿ”Œ \033[1mUsing \033[94mMemos Local client\033[0m \033[1mfor search...\033[0m") + print("๐Ÿ”Œ Using Memos Local client for search...") context, duration_ms = memos_search(client, user_id, question, frame=frame) elif frame == "memobase": client = memobase_client() - print("๐Ÿ”Œ \033[1mUsing \033[94mMemobase client\033[0m \033[1mfor search...\033[0m") - context, duration_ms = memobase_search_memory(client, user_id, question, top_k=top_k) + print("๐Ÿ”Œ Using Memobase client for search...") + context, duration_ms = memobase_search_memory( + client, user_id, question, max_memory_context_size=3000 + ) elif frame == "memos-api": - client = memos_client( - mode="api", - ) - print("๐Ÿ”Œ \033[1mUsing \033[94mMemos API client\033[0m \033[1mfor search...\033[0m") + client = MemOSAPI() + print("๐Ÿ”Œ Using Memos API client for search...") context, duration_ms = memos_search(client, user_id, question, top_k=top_k, frame=frame) search_results[user_id].append( { @@ -231,39 +223,33 @@ def process_user(lme_df, conv_idx, frame, version, top_k=20): f"results/lme/{frame}-{version}/tmp/{frame}_lme_search_results_{conv_idx}.json", "w" ) as f: json.dump(search_results, f, indent=4) - print(f"๐Ÿ’พ \033[92mSearch results for conversation {conv_idx} saved...\033[0m") + print(f"๐Ÿ’พ Search results for conversation {conv_idx} saved...") print("-" * 80) return search_results def load_existing_results(frame, version, group_idx): - result_path = ( - f"results/locomo/{frame}-{version}/tmp/{frame}_locomo_search_results_{group_idx}.json" - ) + result_path = f"results/lme/{frame}-{version}/tmp/{frame}_lme_search_results_{group_idx}.json" if os.path.exists(result_path): try: with open(result_path) as f: return json.load(f), True except Exception as e: - print(f"\033[91mโŒ Error loading existing results for group {group_idx}: {e}\033[0m") + print(f"โŒ Error loading existing results for group {group_idx}: {e}") return {}, False def main(frame, version, top_k=20, num_workers=2): print("\n" + "=" * 80) - print(f"๐Ÿ” \033[1;36mLONGMEMEVAL SEARCH - {frame.upper()} v{version}\033[0m".center(80)) + print(f"๐Ÿ” LONGMEMEVAL SEARCH - {frame.upper()} v{version}".center(80)) print("=" * 80) lme_df = pd.read_json("data/longmemeval/longmemeval_s.json") - print( - "๐Ÿ“š \033[1mLoaded LongMemeval dataset\033[0m from \033[94mdata/longmemeval/longmemeval_s.json\033[0m" - ) + print("๐Ÿ“š Loaded LongMemeval dataset from data/longmemeval/longmemeval_s.json") num_multi_sessions = len(lme_df) - print(f"๐Ÿ‘ฅ Number of users: \033[93m{num_multi_sessions}\033[0m") - print( - f"โš™๏ธ Search parameters: top_k=\033[94m{top_k}\033[0m, workers=\033[94m{num_workers}\033[0m" - ) + print(f"๐Ÿ‘ฅ Number of users: {num_multi_sessions}") + print(f"โš™๏ธ Search parameters: top_k={top_k}, workers={num_workers}") print("-" * 80) all_search_results = defaultdict(list) @@ -279,32 +265,26 @@ def main(frame, version, top_k=20, num_workers=2): as_completed(future_to_idx), total=num_multi_sessions, desc="๐Ÿ“Š Processing users" ): idx = future_to_idx[future] - try: - search_results = future.result() - for user_id, results in search_results.items(): - all_search_results[user_id].extend(results) - except Exception as e: - print(f"\033[91mโŒ Error processing user {idx}: {e}\033[0m") + # try: + search_results = future.result() + for user_id, results in search_results.items(): + all_search_results[user_id].extend(results) + # except Exception as e: + # print(f"โŒ Error processing user {idx}: {e}") end_time = datetime.now() elapsed_time = end_time - start_time elapsed_time_str = str(elapsed_time).split(".")[0] print("\n" + "=" * 80) - print("โœ… \033[1;32mSEARCH COMPLETE\033[0m".center(80)) + print("โœ… SEARCH COMPLETE".center(80)) print("=" * 80) - print( - f"โฑ๏ธ Total time taken to search \033[93m{num_multi_sessions}\033[0m users: \033[92m{elapsed_time_str}\033[0m" - ) - print( - f"๐Ÿ”„ Framework: \033[94m{frame}\033[0m | Version: \033[94m{version}\033[0m | Workers: \033[94m{num_workers}\033[0m" - ) + print(f"โฑ๏ธ Total time taken to search {num_multi_sessions} users: {elapsed_time_str}") + print(f"๐Ÿ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}") with open(f"results/lme/{frame}-{version}/{frame}_lme_search_results.json", "w") as f: json.dump(dict(all_search_results), f, indent=4) - print( - f"๐Ÿ“ Results saved to: \033[1;94mresults/lme/{frame}-{version}/{frame}_lme_search_results.json\033[0m" - ) + print(f"๐Ÿ“ Results saved to: results/lme/{frame}-{version}/{frame}_lme_search_results.json") print("=" * 80 + "\n") @@ -314,15 +294,16 @@ def main(frame, version, top_k=20, num_workers=2): "--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep", "memobase"], + default="memos-api", ) parser.add_argument( - "--version", type=str, default="v1", help="Version of the evaluation framework." + "--version", type=str, default="0923", help="Version of the evaluation framework." ) parser.add_argument( - "--top_k", type=int, default=20, help="Number of top results to retrieve from the search." + "--top_k", type=int, default=30, help="Number of top results to retrieve from the search." ) parser.add_argument( - "--workers", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." + "--workers", type=int, default=30, help="Number of runs for LLM-as-a-Judge evaluation." ) args = parser.parse_args() diff --git a/evaluation/scripts/personamem/pm_ingestion.py b/evaluation/scripts/personamem/pm_ingestion.py new file mode 100644 index 000000000..cd5777ec3 --- /dev/null +++ b/evaluation/scripts/personamem/pm_ingestion.py @@ -0,0 +1,249 @@ +import argparse +import os +import sys +import csv +import json + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime + +from tqdm import tqdm +from utils.client import mem0_client, memos_client, zep_client +from utils.memos_api import MemOSAPI +from zep_cloud.types import Message + + +def ingest_session(session, user_id, session_id, frame, client): + messages = [] + if frame == "zep": + pass + for idx, msg in enumerate(session): + print( + f"[{frame}] ๐Ÿ’ฌ Session [{session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}..." + ) + client.memory.add( + messages=[ + Message( + role=msg["role"], + role_type=msg["role"], + content=msg["content"], + ) + ], + ) + elif frame == "mem0-local" or frame == "mem0-api": + for idx, msg in enumerate(session): + messages.append({"role": msg["role"], "content": msg["content"]}) + print( + f"[{frame}] ๐Ÿ“ Session [{session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}..." + ) + if frame == "mem0-local": + client.add(messages=messages, user_id=user_id) + elif frame == "mem0-api": + client.add( + messages=messages, + user_id=user_id, + session_id=session_id, + version="v2", + ) + print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") + elif frame == "memos-local" or frame == "memos-api": + for i in range(0, len(session), 10): + messages = session[i : i + 10] + client.add(messages=messages, user_id=user_id, conv_id=session_id) + print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") + + +def build_jsonl_index(jsonl_path): + """ + Scan the JSONL file once to build a mapping: {key: file_offset}. + Assumes each line is a JSON object with a single key-value pair. + """ + index = {} + with open(jsonl_path, "r", encoding="utf-8") as f: + while True: + offset = f.tell() + line = f.readline() + if not line: + break + key = next(iter(json.loads(line).keys())) + index[key] = offset + return index + + +def load_context_by_id(jsonl_path, offset): + with open(jsonl_path, "r", encoding="utf-8") as f: + f.seek(offset) + item = json.loads(f.readline()) + return next(iter(item.values())) + + +def load_rows(csv_path): + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: + reader = csv.DictReader(csvfile) + for _, row in enumerate(reader, start=1): + row_data = {} + for column_name, value in row.items(): + row_data[column_name] = value + yield row_data + + +def load_rows_with_context(csv_path, jsonl_path): + jsonl_index = build_jsonl_index(jsonl_path) + + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: + reader = csv.DictReader(csvfile) + prev_sid = None + prev_context = None + for _, row in enumerate(reader, start=1): + row_data = {} + for column_name, value in row.items(): + row_data[column_name] = value + + sid = row_data["shared_context_id"] + if sid != prev_sid: + current_context = load_context_by_id(jsonl_path, jsonl_index[sid]) + prev_sid = sid + prev_context = current_context + else: + current_context = prev_context + yield row_data, current_context + + +def count_csv_rows(csv_path): + with open(csv_path, mode="r", newline="", encoding="utf-8") as f: + return sum(1 for _ in f) - 1 + + +def ingest_conv(row_data, context, version, conv_idx, frame): + end_index_in_shared_context = row_data["end_index_in_shared_context"] + context = context[: int(end_index_in_shared_context)] + user_id = f"pm_exper_user_{conv_idx}_{version}" + print(f"๐Ÿ‘ค User ID: {user_id}") + print("\n" + "=" * 80) + print(f"๐Ÿ”„ INGESTING CONVERSATION {conv_idx}".center(80)) + print("=" * 80) + + if frame == "zep": + client = zep_client() + print("๐Ÿ”Œ Using Zep client for ingestion...") + client.user.delete(user_id) + print(f"๐Ÿ—‘๏ธ Deleted existing user {user_id} from Zep memory...") + client.user.add(user_id=user_id) + print(f"โž• Added user {user_id} to Zep memory...") + elif frame == "mem0-local": + client = mem0_client(mode="local") + print("๐Ÿ”Œ Using Mem0 Local client for ingestion...") + client.delete_all(user_id=user_id) + print(f"๐Ÿ—‘๏ธ Deleted existing memories for user {user_id}...") + elif frame == "mem0-api": + client = mem0_client(mode="api") + print("๐Ÿ”Œ Using Mem0 API client for ingestion...") + client.delete_all(user_id=user_id) + print(f"๐Ÿ—‘๏ธ Deleted existing memories for user {user_id}...") + elif frame == "memos-local": + client = memos_client( + mode="local", + db_name=f"pm_{frame}-{version}", + user_id=user_id, + top_k=20, + mem_cube_path=f"results/pm/{frame}-{version}/storages/{user_id}", + mem_cube_config_path="configs/mu_mem_cube_config.json", + mem_os_config_path="configs/mos_memos_config.json", + addorsearch="add", + ) + print("๐Ÿ”Œ Using Memos Local client for ingestion...") + elif frame == "memos-api": + client = MemOSAPI() + + sessions = [] + session = [] + for idx, text in enumerate(context): + if idx % 30 == 0 and idx > 0: + sessions.append(session) + session = [] + session.append(text) + if session: + sessions.append(session) + + print(f"๐Ÿ“Š Total sessions to ingest: {len(sessions)}") + + for idx, session in enumerate(sessions): + ingest_session( + session=session, + user_id=user_id, + session_id=idx, + frame=frame, + client=client, + ) + print(f"โœ… Ingestion of conversation {conv_idx} completed") + print("=" * 80) + + +def main(frame, version, num_workers=2): + print("\n" + "=" * 80) + print(f"๐Ÿš€ PERSONAMEM INGESTION - {frame.upper()} v{version}".center(80)) + print("=" * 80) + + question_csv_path = "data/personamem/questions_32k.csv" + context_jsonl_path = "data/personamem/shared_contexts_32k.jsonl" + total_rows = count_csv_rows(question_csv_path) + + print(f"๐Ÿ“š Loaded PersonaMem dataset from {question_csv_path} and {context_jsonl_path}") + print("-" * 80) + + start_time = datetime.now() + + all_data = list(load_rows_with_context(question_csv_path, context_jsonl_path)) + + with ThreadPoolExecutor(max_workers=num_workers) as executor: + future_to_idx = { + executor.submit( + ingest_conv, + row_data=row_data, + context=context, + version=version, + conv_idx=idx, + frame=frame, + ): idx + for idx, (row_data, context) in enumerate(all_data) + } + + for future in tqdm( + as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + ): + idx = future_to_idx[future] + try: + future.result() + except Exception as exc: + print(f"\nโŒ Conversation {idx} generated an exception: {exc}") + + end_time = datetime.now() + elapsed_time = end_time - start_time + elapsed_time_str = str(elapsed_time).split(".")[0] + + print("\n" + "=" * 80) + print("โœ… INGESTION COMPLETE".center(80)) + print("=" * 80) + print(f"โฑ๏ธ Total time taken to ingest {total_rows} rows: {elapsed_time_str}") + print(f"๐Ÿ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}") + print("=" * 80 + "\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="PersonaMem Ingestion Script") + parser.add_argument( + "--lib", + type=str, + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], + default="memos-api", + ) + parser.add_argument( + "--version", type=str, default="0925-1", help="Version of the evaluation framework." + ) + parser.add_argument( + "--workers", type=int, default=3, help="Number of parallel workers for processing users." + ) + args = parser.parse_args() + + main(frame=args.lib, version=args.version, num_workers=args.workers) diff --git a/evaluation/scripts/personamem/pm_metric.py b/evaluation/scripts/personamem/pm_metric.py new file mode 100644 index 000000000..4c16b070a --- /dev/null +++ b/evaluation/scripts/personamem/pm_metric.py @@ -0,0 +1,387 @@ +import argparse +import json + +import numpy as np +import pandas as pd + + +def save_to_excel(results, output_path): + """Save results to Excel file""" + combined_data = [] + + # Add overall statistics row + overall_row = { + "category": "overall", + "accuracy": results["metrics"]["accuracy"], + "accuracy_std": results["metrics"]["accuracy_std"], + "total_questions": results["metrics"]["total_questions"], + "total_runs": results["metrics"]["total_runs"], + } + + # Add response duration metrics + for metric, value in results["metrics"]["response_duration"].items(): + overall_row[f"response_{metric}"] = value + + # Add search duration metrics (if exists) + if "search_duration" in results["metrics"] and results["metrics"]["search_duration"]: + for metric, value in results["metrics"]["search_duration"].items(): + overall_row[f"search_{metric}"] = value + + combined_data.append(overall_row) + + # Add category statistics rows + for category, scores in results["category_scores"].items(): + category_row = { + "category": category, + "accuracy": scores["accuracy"], + "accuracy_std": scores["accuracy_std"], + "total_questions": scores["total_questions"], + "total_runs": scores["total_runs"], + } + + # Add response duration metrics + for metric, value in scores["response_duration"].items(): + category_row[f"response_{metric}"] = value + + # Add search duration metrics (if exists) + if "search_duration" in scores and scores["search_duration"]: + for metric, value in scores["search_duration"].items(): + category_row[f"search_{metric}"] = value + + combined_data.append(category_row) + + # Save to Excel + df = pd.DataFrame(combined_data) + df.to_excel(output_path, sheet_name="PersonaMem_Metrics", index=False) + print(f"๐Ÿ“Š Excel file saved to: {output_path}") + + +def calculate_scores(data, grade_path, output_path): + """Calculate PersonaMem evaluation metrics""" + + # Initialize statistics variables + category_scores = {} + user_metrics = {} + + # Overall metrics - collect accuracy for each run + all_response_durations = [] + all_search_durations = [] + total_questions = 0 + + # For calculating accuracy across multiple runs + num_runs = None # Will be determined from first user's data + run_accuracies = [] # List to store accuracy for each run across all users + + # Category-wise statistics + category_response_durations = {} + category_search_durations = {} + category_run_accuracies = {} # Store accuracy for each run by category + + print(f"๐Ÿ“‹ Processing response data for {len(data)} users...") + + # First pass: determine number of runs and initialize run accuracy arrays + for user_id, user_data in data.items(): + # Skip incomplete data (users with only topic field) + if len(user_data) <= 2 and "topic" in user_data: + continue + + results = user_data.get("results", []) + if not results: + continue + + if num_runs is None: + num_runs = len(results) + run_accuracies = [[] for _ in range(num_runs)] # Initialize for each run + print(f"๐Ÿ“Š Detected {num_runs} runs per user") + break + + if num_runs is None: + print("โŒ Error: Could not determine number of runs from data") + return + + # Iterate through all user data + for user_id, user_data in data.items(): + # Skip incomplete data (users with only topic field) + if len(user_data) <= 2 and "topic" in user_data: + print(f"โš ๏ธ Skipping incomplete data for user {user_id}") + continue + + # Get category and results + category = user_data.get("category", "unknown") + results = user_data.get("results", []) + + if not results: + print(f"โš ๏ธ No results found for user {user_id}") + continue + + # Initialize category if not exists + if category not in category_scores: + category_scores[category] = { + "category_name": category, + "total_questions": 0, + "total_runs": 0, + "accuracy": 0.0, + "accuracy_std": 0.0, + "response_duration": {}, + "search_duration": {}, + } + category_response_durations[category] = [] + category_search_durations[category] = [] + category_run_accuracies[category] = [[] for _ in range(num_runs)] + + # Process each run for this user + user_response_durations = [] + for run_idx, result in enumerate(results): + is_correct = result.get("is_correct", False) + + # Collect accuracy for each run (1 if correct, 0 if not) + if run_idx < num_runs: + run_accuracies[run_idx].append(1.0 if is_correct else 0.0) + category_run_accuracies[category][run_idx].append(1.0 if is_correct else 0.0) + + # Collect response duration + response_duration = result.get("response_duration_ms", 0) + if response_duration > 0: + user_response_durations.append(response_duration) + all_response_durations.append(response_duration) + category_response_durations[category].append(response_duration) + + # Get search duration (usually same for all runs) + search_duration = user_data.get("search_duration_ms", 0) + if search_duration > 0: + all_search_durations.append(search_duration) + category_search_durations[category].append(search_duration) + + # Calculate user-level accuracy (average across runs) + user_correct_count = sum(1 for result in results if result.get("is_correct", False)) + user_accuracy = user_correct_count / len(results) if results else 0.0 + + # Store user-level metrics + user_metrics[user_id] = { + "user_id": user_id, + "category": category, + "question": user_data.get("question", ""), + "accuracy": user_accuracy, + "total_runs": len(results), + "correct_runs": user_correct_count, + "avg_response_duration_ms": np.mean(user_response_durations) + if user_response_durations + else 0.0, + "search_duration_ms": search_duration, + "golden_answer": user_data.get("golden_answer", ""), + "topic": user_data.get("topic", ""), + } + + # Count statistics + total_questions += 1 + category_scores[category]["total_questions"] += 1 + category_scores[category]["total_runs"] += len(results) + + # Calculate overall accuracy and std across runs + overall_run_accuracies = [np.mean(run_acc) for run_acc in run_accuracies if run_acc] + overall_accuracy = np.mean(overall_run_accuracies) if overall_run_accuracies else 0.0 + overall_accuracy_std = ( + np.std(overall_run_accuracies) if len(overall_run_accuracies) > 1 else 0.0 + ) + + # Calculate response duration statistics + response_duration_stats = {} + if all_response_durations: + response_duration_stats = { + "mean": np.mean(all_response_durations), + "median": np.median(all_response_durations), + "p50": np.percentile(all_response_durations, 50), + "p95": np.percentile(all_response_durations, 95), + "std": np.std(all_response_durations), + "min": np.min(all_response_durations), + "max": np.max(all_response_durations), + } + + # Calculate search duration statistics + search_duration_stats = {} + if all_search_durations: + search_duration_stats = { + "mean": np.mean(all_search_durations), + "median": np.median(all_search_durations), + "p50": np.percentile(all_search_durations, 50), + "p95": np.percentile(all_search_durations, 95), + "std": np.std(all_search_durations), + "min": np.min(all_search_durations), + "max": np.max(all_search_durations), + } + + # Calculate category-wise metrics + for category in category_scores: + # Calculate accuracy mean and std across runs for this category + cat_run_accuracies = [ + np.mean(run_acc) for run_acc in category_run_accuracies[category] if run_acc + ] + category_scores[category]["accuracy"] = ( + np.mean(cat_run_accuracies) if cat_run_accuracies else 0.0 + ) + category_scores[category]["accuracy_std"] = ( + np.std(cat_run_accuracies) if len(cat_run_accuracies) > 1 else 0.0 + ) + + # Response duration statistics for this category + if category_response_durations[category]: + durations = category_response_durations[category] + category_scores[category]["response_duration"] = { + "mean": np.mean(durations), + "median": np.median(durations), + "p50": np.percentile(durations, 50), + "p95": np.percentile(durations, 95), + "std": np.std(durations), + "min": np.min(durations), + "max": np.max(durations), + } + else: + category_scores[category]["response_duration"] = { + "mean": 0.0, + "median": 0.0, + "p50": 0.0, + "p95": 0.0, + "std": 0.0, + "min": 0.0, + "max": 0.0, + } + + # Search duration statistics for this category + if category_search_durations[category]: + durations = category_search_durations[category] + category_scores[category]["search_duration"] = { + "mean": np.mean(durations), + "median": np.median(durations), + "p50": np.percentile(durations, 50), + "p95": np.percentile(durations, 95), + "std": np.std(durations), + "min": np.min(durations), + "max": np.max(durations), + } + else: + category_scores[category]["search_duration"] = { + "mean": 0.0, + "median": 0.0, + "p50": 0.0, + "p95": 0.0, + "std": 0.0, + "min": 0.0, + "max": 0.0, + } + + # Build final results + results = { + "metrics": { + "accuracy": overall_accuracy, + "accuracy_std": overall_accuracy_std, + "total_questions": total_questions, + "total_runs": total_questions * num_runs if num_runs else 0, + "response_duration": response_duration_stats, + "search_duration": search_duration_stats, + }, + "category_scores": category_scores, + "user_scores": user_metrics, + } + + # Save results to JSON file + with open(grade_path, "w") as outfile: + json.dump(results, outfile, indent=4, ensure_ascii=False) + + # Save to Excel + save_to_excel(results, output_path) + + # Print summary + print_summary(results) + + return results + + +def print_summary(results): + """Print evaluation results summary""" + print("\n" + "=" * 80) + print("๐Ÿ“Š PERSONAMEM EVALUATION SUMMARY".center(80)) + print("=" * 80) + + # Overall accuracy + accuracy = results["metrics"]["accuracy"] + accuracy_std = results["metrics"]["accuracy_std"] + total_questions = results["metrics"]["total_questions"] + total_runs = results["metrics"]["total_runs"] + + print(f"๐ŸŽฏ Overall Accuracy: {accuracy:.4f} ยฑ {accuracy_std:.4f}") + print(f"๐Ÿ“‹ Total Questions: {total_questions}") + print(f"๐Ÿ”„ Total Runs: {total_runs}") + + print("-" * 80) + + # Response duration statistics + if results["metrics"]["response_duration"]: + rd = results["metrics"]["response_duration"] + print("โฑ๏ธ Response Duration Stats (ms):") + print(f" Mean: {rd['mean']:.2f}") + print(f" P50: \033[96m{rd['p50']:.2f}") + print(f" P95: \033[91m{rd['p95']:.2f}") + print(f" Std Dev: {rd['std']:.2f}") + + # Search duration statistics + if results["metrics"]["search_duration"]: + sd = results["metrics"]["search_duration"] + print("๐Ÿ” Search Duration Stats (ms):") + print(f" Mean: {sd['mean']:.2f}") + print(f" P50: \033[96m{sd['p50']:.2f}") + print(f" P95: \033[91m{sd['p95']:.2f}") + print(f" Std Dev: {sd['std']:.2f}") + + print("-" * 80) + + # Category-wise accuracy + print("๐Ÿ“‚ Category-wise Accuracy:") + for category, scores in results["category_scores"].items(): + acc = scores["accuracy"] + acc_std = scores["accuracy_std"] + total_cat = scores["total_questions"] + total_runs_cat = scores["total_runs"] + print( + f" {category:<35}: {acc:.4f} ยฑ {acc_std:.4f} ({total_cat} questions, {total_runs_cat} runs)" + ) + + print("=" * 80 + "\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="PersonaMem evaluation metrics calculation script") + parser.add_argument( + "--lib", + type=str, + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], + required=True, + help="Memory library to evaluate", + default="memos-api", + ) + parser.add_argument("--version", type=str, default="0925", help="Evaluation framework version") + + args = parser.parse_args() + lib, version = args.lib, args.version + + # Define file paths + responses_path = f"results/pm/{lib}-{version}/{lib}_pm_responses.json" + grade_path = f"results/pm/{lib}-{version}/{lib}_pm_grades.json" + output_path = f"results/pm/{lib}-{version}/{lib}_pm_results.xlsx" + + print(f"๐Ÿ“‚ Loading response data from: {responses_path}") + + try: + with open(responses_path, "r", encoding="utf-8") as file: + data = json.load(file) + + # Calculate metrics + results = calculate_scores(data, grade_path, output_path) + + print(f"๐Ÿ“ Results saved to: {grade_path}") + print(f"๐Ÿ“Š Excel report saved to: {output_path}") + + except FileNotFoundError: + print(f"โŒ Error: File not found {responses_path}") + print("Please make sure to run pm_responses.py first to generate response data") + except Exception as e: + print(f"โŒ Error occurred during processing: {e}") diff --git a/evaluation/scripts/personamem/pm_responses.py b/evaluation/scripts/personamem/pm_responses.py new file mode 100644 index 000000000..8cff00dd8 --- /dev/null +++ b/evaluation/scripts/personamem/pm_responses.py @@ -0,0 +1,204 @@ +import argparse +import json +import os +import sys + +from concurrent.futures import ThreadPoolExecutor, as_completed +from time import time + +from dotenv import load_dotenv +from openai import OpenAI +from tqdm import tqdm + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from utils.prompts import PM_ANSWER_PROMPT + +import re + + +def extract_choice_answer(predicted_answer, correct_answer): + def _extract_only_options(text): + text = text.lower() + in_parens = re.findall(r"\(([a-d])\)", text) + if in_parens: + return set(in_parens) + else: + return set(re.findall(r"\b([a-d])\b", text)) + + correct = correct_answer.lower().strip("() ") + + full_response = predicted_answer + predicted_answer = predicted_answer.strip() + + if "" in predicted_answer: + predicted_answer = predicted_answer.split("")[-1].strip() + if predicted_answer.endswith(""): + predicted_answer = predicted_answer[: -len("")].strip() + + pred_options = _extract_only_options(predicted_answer) + + if pred_options == {correct}: + return True, predicted_answer + + response_options = _extract_only_options(full_response) + if response_options == {correct}: + return True, predicted_answer + + return False, predicted_answer + + +def pm_response(llm_client, context, question, options): + prompt = PM_ANSWER_PROMPT.format( + question=question, + context=context, + options=options, + ) + response = llm_client.chat.completions.create( + model=os.getenv("CHAT_MODEL"), + messages=[ + {"role": "system", "content": prompt}, + ], + temperature=0, + ) + result = response.choices[0].message.content or "" + + return result + + +def process_qa(user_id, search_result, num_runs, llm_client): + search_result = search_result[0] + question = search_result.get("question") + context = search_result.get("search_context", "") + options = search_result.get("all_options", []) + + run_results = [] + + for idx in range(num_runs): + start = time() + answer = pm_response(llm_client, context, question, options) + is_correct, answer = extract_choice_answer(answer, search_result.get("golden_answer", "")) + response_duration_ms = (time() - start) * 1000 + + run_results.append( + { + "run_id": idx + 1, + "answer": answer, + "is_correct": is_correct, + "response_duration_ms": response_duration_ms, + } + ) + + response_duration_ms = sum(result["response_duration_ms"] for result in run_results) / num_runs + + print("\n" + "-" * 80) + print(f"๐Ÿค– Processed User: {user_id}") + print(f"โฑ๏ธ Duration: {response_duration_ms:.2f} ms") + print(f"โ“ Question: {question}") + print(f"๐Ÿ’ก Golden Answer: {search_result.get('golden_answer', 'N/A')}") + for idx, result in enumerate(run_results, start=1): + print(f"\n๐Ÿ”„ Run {idx}/{num_runs}:") + print( + f"๐Ÿ’ฌ Run Answer: {result['answer'][:150]}..." + if len(result["answer"]) > 150 + else f"๐Ÿ’ฌ Run Answer: {result['answer']}" + ) + print(f"โœ… Run Is Correct: {result['is_correct']}") + print(f"โฑ๏ธ Run Duration: {result['response_duration_ms']:.2f} ms") + print("-" * 80) + + return { + "user_id": user_id, + "category": search_result.get("category"), + "question": question, + "results": run_results, + "golden_answer": search_result.get("golden_answer"), + "all_options": search_result.get("all_options", []), + "response_duration_ms": response_duration_ms, + "search_context": context, + "search_duration_ms": search_result.get("search_duration_ms"), + "topic": search_result.get("topic"), + } + + +def main(frame, version, num_runs=3, num_workers=4): + print("\n" + "=" * 80) + print(f"๐Ÿš€ PERSONAMEM RESPONSE GENERATION - {frame.upper()} v{version}".center(80)) + print("=" * 80) + + load_dotenv() + + oai_client = OpenAI( + api_key=os.getenv("CHAT_MODEL_API_KEY"), base_url=os.getenv("CHAT_MODEL_BASE_URL") + ) + print(f"๐Ÿ”Œ Using OpenAI client with model: {os.getenv('CHAT_MODEL')}") + + search_path = f"results/pm/{frame}-{version}/{frame}_pm_search_results.json" + response_path = f"results/pm/{frame}-{version}/{frame}_pm_responses.json" + + print(f"๐Ÿ“‚ Loading search results from: {search_path}") + with open(search_path) as file: + pm_search_results = json.load(file) + print(f"๐Ÿ“Š Found {len(pm_search_results)} users to process") + print(f"โš™๏ธ Using {num_workers} worker threads") + print("-" * 80) + + pm_responses = {} + start_time = time() + + with ThreadPoolExecutor(max_workers=num_workers) as executor: + future_to_user_id = {} + + for user_id, search_results in pm_search_results.items(): + future = executor.submit(process_qa, user_id, search_results, num_runs, oai_client) + future_to_user_id[future] = user_id + + for future in tqdm( + as_completed(future_to_user_id), + total=len(future_to_user_id), + desc="๐Ÿ“ Generating responses", + ): + user_id = future_to_user_id[future] + try: + result = future.result() + pm_responses[user_id] = result + except Exception as exc: + print(f"\033[91mโŒ Error processing user {user_id}: {exc}") + + end_time = time() + elapsed_time = end_time - start_time + elapsed_sec = int(elapsed_time) + + print("\n" + "=" * 80) + print("โœ… RESPONSE GENERATION COMPLETE".center(80)) + print("=" * 80) + print(f"โฑ๏ธ Total time: {elapsed_sec // 60}m {elapsed_sec % 60}s") + print(f"๐Ÿ“Š Processed: {len(pm_responses)} users") + print(f"๐Ÿ”„ Framework: {frame} | Version: {version}") + + with open(response_path, "w") as f: + json.dump(pm_responses, f, indent=4) + + print(f"๐Ÿ“ Responses saved to: \033[1;94m{response_path}") + print("=" * 80 + "\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="PersonaMem Response Generation Script") + parser.add_argument( + "--lib", + type=str, + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], + default="memos-api", + ) + parser.add_argument( + "--version", type=str, default="0925", help="Version of the evaluation framework." + ) + parser.add_argument( + "--num_runs", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." + ) + parser.add_argument( + "--workers", type=int, default=3, help="Number of worker threads to use for processing." + ) + + args = parser.parse_args() + main(frame=args.lib, version=args.version, num_runs=args.num_runs, num_workers=args.workers) diff --git a/evaluation/scripts/personamem/pm_search.py b/evaluation/scripts/personamem/pm_search.py new file mode 100644 index 000000000..81515e2cb --- /dev/null +++ b/evaluation/scripts/personamem/pm_search.py @@ -0,0 +1,374 @@ +import argparse +import json +import os +import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from time import time + +import csv + +from tqdm import tqdm +from utils.client import mem0_client, memos_client, zep_client +from utils.memos_api import MemOSAPI +from utils.memos_filters import filter_memory_data +from utils.prompts import ( + MEM0_CONTEXT_TEMPLATE, + MEM0_GRAPH_CONTEXT_TEMPLATE, + MEMOS_CONTEXT_TEMPLATE, + ZEP_CONTEXT_TEMPLATE, +) + + +def zep_search(client, user_id, query, top_k=20): + start = time() + nodes_result = client.graph.search( + query=query, + user_id=user_id, + scope="nodes", + reranker="rrf", + limit=top_k, + ) + edges_result = client.graph.search( + query=query, + user_id=user_id, + scope="edges", + reranker="cross_encoder", + limit=top_k, + ) + + nodes = nodes_result.nodes + edges = edges_result.edges + + facts = [f" - {edge.fact} (event_time: {edge.valid_at})" for edge in edges] + entities = [f" - {node.name}: {node.summary}" for node in nodes] + context = ZEP_CONTEXT_TEMPLATE.format(facts="\n".join(facts), entities="\n".join(entities)) + + duration_ms = (time() - start) * 1000 + + return context, duration_ms + + +def mem0_search(client, user_id, query, top_k=20, enable_graph=False, frame="mem0-api"): + start = time() + + if frame == "mem0-local": + results = client.search( + query=query, + user_id=user_id, + top_k=top_k, + ) + search_memories = "\n".join( + [ + f" - {item['memory']} (date: {item['metadata']['timestamp']})" + for item in results["results"] + ] + ) + search_graph = ( + "\n".join( + [ + f" - 'source': {item.get('source', '?')} -> 'target': {item.get('destination', '?')} (relationship: {item.get('relationship', '?')})" + for item in results.get("relations", []) + ] + ) + if enable_graph + else "" + ) + + elif frame == "mem0-api": + results = client.search( + query=query, + user_id=user_id, + top_k=top_k, + version="v2", + output_format="v1.1", + enable_graph=enable_graph, + filters={"AND": [{"user_id": user_id}, {"run_id": "*"}]}, + ) + search_memories = "\n".join( + [f" - {item['memory']} (date: {item['created_at']})" for item in results["results"]] + ) + search_graph = ( + "\n".join( + [ + f" - 'source': {item.get('source', '?')} -> 'target': {item.get('target', '?')} (relationship: {item.get('relationship', '?')})" + for item in results.get("relations", []) + ] + ) + if enable_graph + else "" + ) + if enable_graph: + context = MEM0_GRAPH_CONTEXT_TEMPLATE.format( + user_id=user_id, memories=search_memories, relations=search_graph + ) + else: + context = MEM0_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) + duration_ms = (time() - start) * 1000 + return context, duration_ms + + +def memos_search(client, user_id, query, top_k, frame="memos-local"): + start = time() + if frame == "memos-local": + results = client.search( + query=query, + user_id=user_id, + ) + + results = filter_memory_data(results)["text_mem"][0]["memories"] + search_memories = "\n".join([f" - {item['memory']}" for item in results]) + + elif frame == "memos-api": + results = client.search(query=query, user_id=user_id, top_k=top_k) + search_memories = "\n".join( + f"- {entry.get('memory_value', '')}" for entry in results.get("memory_detail_list", []) + ) + context = MEMOS_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) + + duration_ms = (time() - start) * 1000 + return context, duration_ms + + +def build_jsonl_index(jsonl_path): + """ + Scan the JSONL file once to build a mapping: {key: file_offset}. + Assumes each line is a JSON object with a single key-value pair. + """ + index = {} + with open(jsonl_path, "r", encoding="utf-8") as f: + while True: + offset = f.tell() + line = f.readline() + if not line: + break + key = next(iter(json.loads(line).keys())) + index[key] = offset + return index + + +def load_context_by_id(jsonl_path, offset): + with open(jsonl_path, "r", encoding="utf-8") as f: + f.seek(offset) + item = json.loads(f.readline()) + return next(iter(item.values())) + + +def load_rows(csv_path): + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: + reader = csv.DictReader(csvfile) + for _, row in enumerate(reader, start=1): + row_data = {} + for column_name, value in row.items(): + row_data[column_name] = value + yield row_data + + +def load_rows_with_context(csv_path, jsonl_path): + jsonl_index = build_jsonl_index(jsonl_path) + + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: + reader = csv.DictReader(csvfile) + prev_sid = None + prev_context = None + + for _, row in enumerate(reader, start=1): + row_data = {} + for column_name, value in row.items(): + row_data[column_name] = value + + sid = row_data["shared_context_id"] + if sid != prev_sid: + current_context = load_context_by_id(jsonl_path, jsonl_index[sid]) + + prev_sid = sid + prev_context = current_context + else: + current_context = prev_context + + yield row_data, current_context + + +def count_csv_rows(csv_path): + with open(csv_path, mode="r", newline="", encoding="utf-8") as f: + return sum(1 for _ in f) - 1 + + +def process_user(row_data, conv_idx, frame, version, top_k=20): + persona_id = row_data["persona_id"] + question_id = row_data["question_id"] + question_type = row_data["question_type"] + topic = row_data["topic"] + question = row_data["user_question_or_message"] + correct_answer = row_data["correct_answer"] + all_options = row_data["all_options"] + user_id = f"pm_exper_user_{conv_idx}_{version}" + print(f"\n๐Ÿ” Processing conversation {conv_idx} for user {user_id}...") + + search_results = defaultdict(list) + print("\n" + "-" * 80) + print(f"๐Ÿ”Ž [{conv_idx + 1}/589] Processing conversation {conv_idx}") + print(f"โ“ Question: {question}") + print(f"๐Ÿ—‚๏ธ Options: {all_options}") + print(f"๐Ÿท๏ธ Type: {question_type}") + print("-" * 80) + + existing_results, exists = load_existing_results(frame, version, conv_idx) + if exists: + print(f"โ™ป๏ธ Using existing results for conversation {conv_idx}") + return existing_results + + if frame == "zep": + client = zep_client() + print("๐Ÿ”Œ Using Zep client for search...") + context, duration_ms = zep_search(client, user_id, question) + + elif frame == "mem0-local": + client = mem0_client(mode="local") + print("๐Ÿ”Œ Using Mem0 Local client for search...") + context, duration_ms = mem0_search(client, user_id, question, top_k=top_k, frame=frame) + elif frame == "mem0-api": + client = mem0_client(mode="api") + print("๐Ÿ”Œ Using Mem0 API client for search...") + context, duration_ms = mem0_search(client, user_id, question, top_k=top_k, frame=frame) + elif frame == "memos-local": + client = memos_client( + mode="local", + db_name=f"pm_{frame}-{version}", + user_id=user_id, + top_k=top_k, + mem_cube_path=f"results/pm/{frame}-{version}/storages/{user_id}", + mem_cube_config_path="configs/mu_mem_cube_config.json", + mem_os_config_path="configs/mos_memos_config.json", + addorsearch="search", + ) + print("๐Ÿ”Œ Using Memos Local client for search...") + context, duration_ms = memos_search(client, user_id, question, frame=frame) + elif frame == "memos-api": + client = MemOSAPI() + print("๐Ÿ”Œ Using Memos API client for search...") + context, duration_ms = memos_search(client, user_id, question, top_k=top_k, frame=frame) + + search_results[user_id].append( + { + "user_id": user_id, + "question": question, + "category": question_type, + "persona_id": persona_id, + "question_id": question_id, + "all_options": all_options, + "topic": topic, + "golden_answer": correct_answer, + "search_context": context, + "search_duration_ms": duration_ms, + } + ) + + os.makedirs(f"results/pm/{frame}-{version}/tmp", exist_ok=True) + with open( + f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{conv_idx}.json", "w" + ) as f: + json.dump(search_results, f, indent=4) + print(f"๐Ÿ’พ \033[92mSearch results for conversation {conv_idx} saved...") + print("-" * 80) + + return search_results + + +def load_existing_results(frame, version, group_idx): + result_path = ( + f"results/locomo/{frame}-{version}/tmp/{frame}_locomo_search_results_{group_idx}.json" + ) + if os.path.exists(result_path): + try: + with open(result_path) as f: + return json.load(f), True + except Exception as e: + print(f"\033[91mโŒ Error loading existing results for group {group_idx}: {e}") + return {}, False + + +def main(frame, version, top_k=20, num_workers=2): + print("\n" + "=" * 80) + print(f"๐Ÿ” PERSONAMEM SEARCH - {frame.upper()} v{version}".center(80)) + print("=" * 80) + + question_csv_path = "data/personamem/questions_32k.csv" + context_jsonl_path = "data/personamem/shared_contexts_32k.jsonl" + total_rows = count_csv_rows(question_csv_path) + + print(f"๐Ÿ“š Loaded PersonaMem dataset from {question_csv_path} and {context_jsonl_path}") + print(f"๐Ÿ“Š Total conversations: {total_rows}") + print(f"โš™๏ธ Search parameters: top_k={top_k}, workers={num_workers}") + print("-" * 80) + + all_search_results = defaultdict(list) + start_time = datetime.now() + + all_data = list(load_rows_with_context(question_csv_path, context_jsonl_path)) + with ThreadPoolExecutor(max_workers=num_workers) as executor: + future_to_idx = { + executor.submit( + process_user, + row_data=row_data, + version=version, + conv_idx=idx, + frame=frame, + ): idx + for idx, (row_data, _) in enumerate(all_data) + } + + for future in tqdm( + as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + ): + idx = future_to_idx[future] + try: + search_results = future.result() + for user_id, results in search_results.items(): + all_search_results[user_id].extend(results) + print(f"โœ… Conversation {idx} processed successfully.") + except Exception as exc: + print(f"\nโŒ Conversation {idx} generated an exception: {exc}") + + end_time = datetime.now() + elapsed_time = end_time - start_time + elapsed_time_str = str(elapsed_time).split(".")[0] + + print("\n" + "=" * 80) + print("โœ… \033[1;32mSEARCH COMPLETE".center(80)) + print("=" * 80) + print(f"โฑ๏ธ Total time taken to search {total_rows} users: \033[92m{elapsed_time_str}") + print(f"๐Ÿ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}") + + with open(f"results/pm/{frame}-{version}/{frame}_pm_search_results.json", "w") as f: + json.dump(dict(all_search_results), f, indent=4) + print( + f"๐Ÿ“ Results saved to: \033[1;94mresults/pm/{frame}-{version}/{frame}_pm_search_results.json" + ) + print("=" * 80 + "\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="PersonaMem Search Script") + parser.add_argument( + "--lib", + type=str, + choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], + default="memos-api", + ) + parser.add_argument( + "--version", type=str, default="0925", help="Version of the evaluation framework." + ) + parser.add_argument( + "--top_k", type=int, default=20, help="Number of top results to retrieve from the search." + ) + parser.add_argument( + "--workers", type=int, default=3, help="Number of parallel workers for processing users." + ) + + args = parser.parse_args() + + main(frame=args.lib, version=args.version, top_k=args.top_k, num_workers=args.workers) diff --git a/evaluation/scripts/run_prefeval_eval.sh b/evaluation/scripts/run_prefeval_eval.sh new file mode 100644 index 000000000..59f53dd68 --- /dev/null +++ b/evaluation/scripts/run_prefeval_eval.sh @@ -0,0 +1,47 @@ +#!/bin/bash + +# --- Configuration --- +# This script runs the PrefEval pipeline in three steps. +# You can configure the number of workers for parallel processing here. + +# Number of workers for scripts that support parallel execution +WORKERS=10 + +# Set the Hugging Face mirror endpoint +export HF_ENDPOINT="https://hf-mirror.com" + +echo "--- Starting PrefEval Pipeline ---" +echo "Configuration: WORKERS=$WORKERS, HF_ENDPOINT=$HF_ENDPOINT" +echo "" + +# --- Step 1: Preprocess the data --- +echo "Running prefeval_preprocess.py..." +python scripts/PrefEval/prefeval_preprocess.py +# Check if the last command executed successfully +if [ $? -ne 0 ]; then + echo "Error: Data preprocessing failed." + exit 1 +fi + +# --- Step 2: Generate responses using MemOS --- +echo "" +echo "Running pref_memos.py..." +# Pass the WORKERS variable to the script's --max-workers argument +python scripts/PrefEval/pref_memos.py --max-workers $WORKERS +if [ $? -ne 0 ]; then + echo "Error: Response generation with MemOS failed." + exit 1 +fi + +# --- Step 3: Evaluate the generated responses --- +echo "" +echo "Running pref_eval.py..." +# Pass the WORKERS variable to the script's --concurrency-limit argument +python scripts/PrefEval/pref_eval.py --concurrency-limit $WORKERS +if [ $? -ne 0 ]; then + echo "Error: Evaluation script failed." + exit 1 +fi + +echo "" +echo "--- PrefEval Pipeline completed successfully! ---" \ No newline at end of file diff --git a/evaluation/scripts/utils/client.py b/evaluation/scripts/utils/client.py index 33aea7497..8fe166e7c 100644 --- a/evaluation/scripts/utils/client.py +++ b/evaluation/scripts/utils/client.py @@ -1,13 +1,7 @@ import json import os import sys - from dotenv import load_dotenv -from mem0 import MemoryClient -from memobase import MemoBaseClient -from zep_cloud.client import Zep -from zep_cloud.types import Message - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from memobase import ChatBlob @@ -16,8 +10,6 @@ from memos.configs.mem_os import MOSConfig from memos.mem_cube.general import GeneralMemCube from memos.mem_os.product import MOSProduct -from utils.mem0_local import Mem0Client -from utils.memos_api import MemOSAPI from utils.memos_filters import filter_memory_data @@ -25,6 +17,8 @@ def zep_client(): + from zep_cloud.client import Zep + """Initialize and return a Zep client instance.""" api_key = os.getenv("ZEP_API_KEY") zep = Zep(api_key=api_key) @@ -32,16 +26,10 @@ def zep_client(): return zep -def mem0_client(mode="local"): - """Initialize and return a Mem0 client instance.""" - if mode == "local": - base_url = "http://localhost:9999" - mem0 = Mem0Client(base_url=base_url) - elif mode == "api": - mem0 = MemoryClient(api_key=os.getenv("MEM0_API_KEY")) - else: - raise ValueError("Invalid mode. Choose 'local' or 'cloud'.") +def mem0_client(mode="api"): + from mem0 import MemoryClient + mem0 = MemoryClient(api_key=os.getenv("MEM0_API_KEY")) return mem0 @@ -87,13 +75,12 @@ def memos_client( default_mem_cube=mem_cube, ) - elif mode == "api": - memos = MemOSAPI(base_url=os.getenv("MEMOS_BASE_URL")) - return memos def memobase_client(): + from memobase import MemoBaseClient + client = MemoBaseClient( project_url=os.getenv("MEMOBASE_PROJECT_URL"), api_key=os.getenv("MEMOBASE_API_KEY"), @@ -114,6 +101,7 @@ def memobase_client(): session_id=session_id, user_id=user_id, ) + from zep_cloud.types import Message messages = [ Message( diff --git a/evaluation/scripts/utils/memobase_utils.py b/evaluation/scripts/utils/memobase_utils.py index dcf06ea31..f818cadec 100644 --- a/evaluation/scripts/utils/memobase_utils.py +++ b/evaluation/scripts/utils/memobase_utils.py @@ -1,11 +1,5 @@ import time -import uuid - -from memobase import ChatBlob - - -def string_to_uuid(s: str, salt="memobase_client") -> str: - return str(uuid.uuid5(uuid.NAMESPACE_DNS, s + salt)) +from memobase import MemoBaseClient, ChatBlob def memobase_add_memory(user, message, retries=3): @@ -24,19 +18,26 @@ def memobase_add_memory(user, message, retries=3): def memobase_search_memory( client, user_id, query, max_memory_context_size, max_retries=3, retry_delay=1 ): + users = client.get_all_users(limit=5000) + for u in users: + try: + if u["additional_fields"]["user_id"] == user_id: + user = client.get_user(u["id"], no_get=True) + except: + pass + retries = 0 - real_uid = string_to_uuid(user_id) - u = client.get_user(real_uid, no_get=True) while retries < max_retries: try: - memories = u.context( + t = time.time() + memories = user.context( max_token_size=max_memory_context_size, chats=[{"role": "user", "content": query}], event_similarity_threshold=0.2, fill_window_with_events=True, ) - return memories + return memories, (time.time() - t) * 1000 except Exception as e: print(f"Error during memory search: {e}") print("Retrying...") diff --git a/evaluation/scripts/utils/memos_api.py b/evaluation/scripts/utils/memos_api.py index 7b7f2a061..5f73a78c3 100644 --- a/evaluation/scripts/utils/memos_api.py +++ b/evaluation/scripts/utils/memos_api.py @@ -1,63 +1,78 @@ import json +import os +import traceback import requests +from dotenv import load_dotenv + +load_dotenv() + +memos_key = os.getenv("MEMOS_KEY") +memos_url = os.getenv("MEMOS_URL") class MemOSAPI: - def __init__(self, base_url: str = "http://localhost:8000"): + def __init__(self, base_url: str = memos_url, memos_key: str = memos_key): self.base_url = base_url - self.headers = {"Content-Type": "application/json"} - - def user_register(self, user_id: str): - """Register a user.""" - url = f"{self.base_url}/users/register" - payload = json.dumps({"user_id": user_id}) - response = requests.request("POST", url, data=payload, headers=self.headers) - return response.text + self.headers = {"Content-Type": "application/json", "Authorization": memos_key} - def add(self, messages: list[dict], user_id: str | None = None): + def add(self, messages: list[dict], user_id: str | None = None, conv_id: str | None = None): """Create memories.""" - register_res = json.loads(self.user_register(user_id)) - cube_id = register_res["data"]["mem_cube_id"] - url = f"{self.base_url}/add" - payload = json.dumps({"messages": messages, "user_id": user_id, "mem_cube_id": cube_id}) + retry = 0 + while retry < 10: + try: + url = f"{self.base_url}/add/message" + payload = json.dumps( + {"messages": messages, "user_id": user_id, "conversation_id": conv_id} + ) + response = requests.request("POST", url, data=payload, headers=self.headers) + assert response.status_code == 200, response.text + assert json.loads(response.text)["code"] == 0, response.text + return response.text + except Exception as e: + print(f"call memos api add failed {e} retry time {retry}") + # traceback.print_exc() + retry += 1 + assert retry != 10, "add memory failed" - response = requests.request("POST", url, data=payload, headers=self.headers) - return response.text - - def search(self, query: str, user_id: str | None = None, top_k: int = 10): + def search( + self, query: str, user_id: str | None = None, conv_id: str | None = "", top_k: int = 10 + ): """Search memories.""" - url = f"{self.base_url}/search" - payload = json.dumps( - { - "query": query, - "user_id": user_id, - } - ) - - response = requests.request("POST", url, data=payload, headers=self.headers) - if response.status_code != 200: - response.raise_for_status() - else: - result = json.loads(response.text)["data"]["text_mem"][0]["memories"] - text_memories = [item["memory"] for item in result][:top_k] - return text_memories + retry = 0 + while retry < 10: + try: + url = f"{self.base_url}/search/memory" + payload = json.dumps( + { + "query": query, + "user_id": user_id, + "conversation_id": conv_id, + "memory_limit_number": top_k, + }, + ensure_ascii=False, + ) + + response = requests.request("POST", url, data=payload, headers=self.headers) + assert response.status_code == 200, response.text + assert json.loads(response.text)["code"] == 0, response.text + return json.loads(response.text)["data"] + except Exception as e: + print(f"call memos api search failed {e} retry time {retry}") + # traceback.print_exc() + retry += 1 + assert retry != 10, "search memory failed" if __name__ == "__main__": - client = MemOSAPI(base_url="http://localhost:8000") - # Example usage - try: - messages = [ - { - "role": "user", - "content": "I went to the store and bought a red apple.", - "chat_time": "2023-10-01T12:00:00Z", - } - ] - add_response = client.add(messages, user_id="user789") - print("Add memory response:", add_response) - search_response = client.search("red apple", user_id="user789", top_k=1) - print("Search memory response:", search_response) - except requests.RequestException as e: - print("An error occurred:", e) + client = MemOSAPI() + user_id = "eval_test" + conv_id = "eval_test_benchmark1_conv1" + messages = [ + {"role": "user", "content": "ๆญๅทž่ฅฟๆน–ๆœ‰ไป€ไนˆๅฅฝ็Žฉ็š„"}, + {"role": "assistant", "content": "ๆญๅทž่ฅฟๆน–ๆœ‰ๅฅฝๅคšๆพ้ผ ๏ผŒ่ฟ˜ๆœ‰ๆ–ญๆกฅ"}, + ] + + memories = client.search("ๆˆ‘ๆœ€่ฟ‘ๆœ‰ไป€ไนˆ่ฎฐๅฟ†", user_id=user_id, top_k=6) + response = client.add(messages, user_id, conv_id) + memories = client.search("ๆˆ‘ๆœ€่ฟ‘ๆœ‰ไป€ไนˆ่ฎฐๅฟ†", user_id=user_id, top_k=6)