diff --git a/evaluation/.env-example b/evaluation/.env-example index fc57344da..4b2b9311f 100644 --- a/evaluation/.env-example +++ b/evaluation/.env-example @@ -3,21 +3,28 @@ MODEL="gpt-4o-mini" OPENAI_API_KEY="sk-***REDACTED***" OPENAI_BASE_URL="http://***.***.***.***:3000/v1" -MEM0_API_KEY="m0-***REDACTED***" - -ZEP_API_KEY="z_***REDACTED***" # response model CHAT_MODEL="gpt-4o-mini" CHAT_MODEL_BASE_URL="http://***.***.***.***:3000/v1" CHAT_MODEL_API_KEY="sk-***REDACTED***" +# memos MEMOS_KEY="Token mpg-xxxxx" -MEMOS_URL="https://apigw-pre.memtensor.cn/api/openmem/v1" -PRE_SPLIT_CHUNK=false # pre split chunk in client end +MEMOS_URL="http://127.0.0.1:8001" +MEMOS_ONLINE_URL="https://memos.memtensor.cn/api/openmem/v1" + +# other memory agents +MEM0_API_KEY="m0-xxx" +ZEP_API_KEY="z_xxx" +MEMU_API_KEY="mu_xxx" +SUPERMEMORY_API_KEY="sm_xxx" +MEMOBASE_API_KEY="xxx" +MEMOBASE_PROJECT_URL="http://***.***.***.***:8019" + +# eval settings +PRE_SPLIT_CHUNK=false -MEMOBASE_API_KEY="xxxxx" -MEMOBASE_PROJECT_URL="http://xxx.xxx.xxx.xxx:8019" # Configuration Only For Scheduler # RabbitMQ Configuration diff --git a/evaluation/README.md b/evaluation/README.md index 16752c075..f0bd166e1 100644 --- a/evaluation/README.md +++ b/evaluation/README.md @@ -21,7 +21,14 @@ This repository provides tools and scripts for evaluating the LoCoMo dataset usi 2. Copy the `configs-example/` directory to a new directory named `configs/`, and modify the configuration files inside it as needed. This directory contains model and API-specific settings. +## Setup MemOS +```bash +#start server +uvicorn memos.api.server_api:app --host 0.0.0.0 --port 8001 --workers 8 +# modify .env file +MEMOS_URL="http://127.0.0.1:8001" +``` ## Evaluation Scripts ### LoCoMo Evaluation @@ -45,10 +52,20 @@ First prepare the dataset `longmemeval_s` from https://huggingface.co/datasets/x ./scripts/run_lme_eval.sh ``` -### prefEval Evaluation +### PrefEval Evaluation +To evaluate the **Prefeval** dataset using one of the supported memory frameworks β€” `memos`, `mem0`, or `zep` β€” run the following [script](./scripts/run_prefeval_eval.sh): -### personaMem Evaluation +```bash +# Edit the configuration in ./scripts/run_prefeval_eval.sh +# Specify the model and memory backend you want to use (e.g., mem0, zep, etc.) +./scripts/run_prefeval_eval.sh +``` + +### PersonaMem Evaluation get `questions_32k.csv` and `shared_contexts_32k.jsonl` from https://huggingface.co/datasets/bowen-upenn/PersonaMem and save them at `data/personamem/` ```bash +# Edit the configuration in ./scripts/run_pm_eval.sh +# Specify the model and memory backend you want to use (e.g., mem0, zep, etc.) +# If you want to use MIRIX, edit the the configuration in ./scripts/personamem/config.yaml ./scripts/run_pm_eval.sh ``` diff --git a/evaluation/scripts/PrefEval/pref_eval.py b/evaluation/scripts/PrefEval/pref_eval.py index cd9c5dde2..10cf41bf3 100644 --- a/evaluation/scripts/PrefEval/pref_eval.py +++ b/evaluation/scripts/PrefEval/pref_eval.py @@ -15,10 +15,6 @@ API_KEY = os.getenv("OPENAI_API_KEY") API_URL = os.getenv("OPENAI_BASE_URL") -INPUT_FILE = "./results/prefeval/pref_memos_process.jsonl" -OUTPUT_FILE = "./results/prefeval/eval_pref_memos.jsonl" -OUTPUT_EXCEL_FILE = "./results/prefeval/eval_pref_memos_summary.xlsx" - async def call_gpt4o_mini_async(client: OpenAI, prompt: str) -> str: messages = [{"role": "user", "content": prompt}] @@ -255,9 +251,10 @@ def generate_excel_summary( avg_search_time: float, avg_context_tokens: float, avg_add_time: float, + output_excel_file: str, model_name: str = "gpt-4o-mini", ): - print(f"Generating Excel summary at {OUTPUT_EXCEL_FILE}...") + print(f"Generating Excel summary at {output_excel_file}...") def get_pct(key): return summary_results.get(key, {}).get("percentage", 0) @@ -282,7 +279,7 @@ def get_pct(key): df = pd.DataFrame(data) - with pd.ExcelWriter(OUTPUT_EXCEL_FILE, engine="xlsxwriter") as writer: + with pd.ExcelWriter(output_excel_file, engine="xlsxwriter") as writer: df.to_excel(writer, index=False, sheet_name="Summary") workbook = writer.book @@ -300,10 +297,10 @@ def get_pct(key): bold_pct_format = workbook.add_format({"num_format": "0.0%", "bold": True}) worksheet.set_column("F:F", 18, bold_pct_format) - print(f"Successfully saved summary to {OUTPUT_EXCEL_FILE}") + print(f"Successfully saved summary to {output_excel_file}") -async def main(concurrency_limit: int): +async def main(concurrency_limit: int, input_file: str, output_file: str, output_excel_file: str): semaphore = asyncio.Semaphore(concurrency_limit) error_counter = Counter() @@ -313,17 +310,17 @@ async def main(concurrency_limit: int): total_add_time = 0 print(f"Starting evaluation with a concurrency limit of {concurrency_limit}...") - print(f"Input file: {INPUT_FILE}") - print(f"Output JSONL: {OUTPUT_FILE}") - print(f"Output Excel: {OUTPUT_EXCEL_FILE}") + print(f"Input file: {input_file}") + print(f"Output JSONL: {output_file}") + print(f"Output Excel: {output_excel_file}") client = OpenAI(api_key=API_KEY, base_url=API_URL) try: - with open(INPUT_FILE, "r", encoding="utf-8") as f: + with open(input_file, "r", encoding="utf-8") as f: lines = f.readlines() except FileNotFoundError: - print(f"Error: Input file not found at '{INPUT_FILE}'") + print(f"Error: Input file not found at '{input_file}'") return if not lines: @@ -332,7 +329,7 @@ async def main(concurrency_limit: int): tasks = [process_line(line, client, semaphore) for line in lines] - with open(OUTPUT_FILE, "w", encoding="utf-8") as outfile: + with open(output_file, "w", encoding="utf-8") as outfile: pbar = tqdm( asyncio.as_completed(tasks), total=len(tasks), @@ -382,6 +379,7 @@ async def main(concurrency_limit: int): avg_search_time, avg_context_tokens, avg_add_time, + output_excel_file, ) except Exception as e: print(f"\nFailed to generate Excel file: {e}") @@ -389,6 +387,11 @@ async def main(concurrency_limit: int): if __name__ == "__main__": parser = argparse.ArgumentParser(description="Evaluate assistant responses from a JSONL file.") + + parser.add_argument( + "--input", type=str, required=True, help="Path to the input JSONL file from pref_memos.py." + ) + parser.add_argument( "--concurrency-limit", type=int, @@ -397,4 +400,17 @@ async def main(concurrency_limit: int): ) args = parser.parse_args() - asyncio.run(main(concurrency_limit=args.concurrency_limit)) + input_path = args.input + output_dir = os.path.dirname(input_path) + + output_jsonl_path = os.path.join(output_dir, "eval_pref_memos.jsonl") + output_excel_path = os.path.join(output_dir, "eval_pref_memos_summary.xlsx") + + asyncio.run( + main( + concurrency_limit=args.concurrency_limit, + input_file=input_path, + output_file=output_jsonl_path, + output_excel_file=output_excel_path, + ) + ) diff --git a/evaluation/scripts/PrefEval/pref_mem0.py b/evaluation/scripts/PrefEval/pref_mem0.py new file mode 100644 index 000000000..416d8045f --- /dev/null +++ b/evaluation/scripts/PrefEval/pref_mem0.py @@ -0,0 +1,295 @@ +import argparse +import concurrent.futures +import json +import os +import sys +import time +import tiktoken +from dotenv import load_dotenv +from openai import OpenAI +from tqdm import tqdm + +from irrelevant_conv import irre_10, irre_300 + +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) +load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +BASE_URL = os.getenv("OPENAI_BASE_URL") +MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") +tokenizer = tiktoken.get_encoding("cl100k_base") +os.environ["MEM0_API_KEY"] = os.getenv("MEM0_API_KEY") + + +def add_memory_for_line( + line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str +) -> dict: + """ + Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. + """ + i, line = line_data + user_id = f"{lib}_user_pref_eval_{i}_{version}" + + try: + original_data = json.loads(line) + conversation = original_data.get("conversation", []) + + if num_irrelevant_turns == 10: + conversation = conversation + irre_10 + elif num_irrelevant_turns == 300: + conversation = conversation + irre_300 + + turns_add = 5 + start_time_add = time.monotonic() + if conversation: + for chunk_start in range(0, len(conversation), turns_add * 2): + chunk = conversation[chunk_start : chunk_start + turns_add * 2] + timestamp_add = int(time.time() * 100) + mem_client.add(messages=chunk, user_id=user_id, timestamp=timestamp_add) + end_time_add = time.monotonic() + add_duration = end_time_add - start_time_add + + original_data["user_id"] = user_id + original_data["metrics"] = {"add_memories_duration_seconds": add_duration} + return original_data + + except Exception as e: + print(f"Error adding memory for line {i + 1} (user_id: {user_id}): {e}") + return None + + +def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: + """ + Processes a single line of data, searching memory based on the question. + """ + i, line = line_data + try: + original_data = json.loads(line) + + user_id = original_data.get("user_id") + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + + if not user_id: + original_data["error"] = ( + "Error: user_id not found in this line. Please run 'add' mode first." + ) + return original_data + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + start_time_search = time.monotonic() + relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=top_k_value) + search_memories_duration = time.monotonic() - start_time_search + memory_list = relevant_memories.get("results", []) + memories_str = "\n".join(f"- {entry['memory']}" for entry in memory_list) + + memory_tokens_used = len(tokenizer.encode(memories_str)) + + metrics_dict.update( + { + "search_memories_duration_seconds": search_memories_duration, + "memory_tokens_used": memory_tokens_used, + "retrieved_memories_text": memories_str, + } + ) + original_data["metrics"] = metrics_dict + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error searching memory for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def generate_response_for_line(line_data: tuple, openai_client: OpenAI) -> dict: + """ + Generates a response for a single line of data using pre-fetched memories. + """ + i, line = line_data + try: + original_data = json.loads(line) + + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + memories_str = metrics_dict.get("retrieved_memories_text") + + # If an error occurred in 'add' or 'search' mode, just pass the line through + if original_data.get("error"): + return original_data + + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + # Check for None, as an empty string (no memories found) is a valid result + if memories_str is None: + original_data["error"] = ( + "Error: retrieved_memories_text not found in metrics. " + "Please run 'search' mode first." + ) + return original_data + + system_prompt = f"You are a helpful AI. Answer the question based on the query and the following memories:\nUser Memories:\n{memories_str}" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": question}, + ] + + response = openai_client.chat.completions.create(model=MODEL_NAME, messages=messages) + assistant_response = response.choices[0].message.content + original_data["response"] = assistant_response + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error generating response for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Process conversations with MemOS. Run 'add', then 'search', then 'response'." + ) + parser.add_argument( + "mode", + choices=["add", "search", "response"], + help="The mode to run the script in ('add', 'search', or 'response').", + ) + parser.add_argument("--input", required=True, help="Path to the input JSONL file.") + parser.add_argument("--output", required=True, help="Path to the output JSONL file.") + parser.add_argument( + "--top-k", + type=int, + default=10, + help="Number of memories to retrieve (used in 'search' mode).", + ) + parser.add_argument( + "--add-turn", + type=int, + choices=[0, 10, 300], + default=0, + help="Number of irrelevant turns to add (used in 'add' mode).", + ) + parser.add_argument( + "--lib", + type=str, + choices=["mem0", "mem0_graph"], + default="mem0", + help="Which Mem0 library to use (used in 'add' mode).", + ) + parser.add_argument( + "--version", + type=str, + default="0929-1", + help="Version identifier for user_id generation (used in 'add' mode).", + ) + parser.add_argument( + "--max-workers", type=int, default=20, help="Maximum number of concurrent workers." + ) + + args = parser.parse_args() + + try: + with open(args.input, "r", encoding="utf-8") as infile: + lines = infile.readlines() + except FileNotFoundError: + print(f"Error: Input file '{args.input}' not found") + return + + from utils.client import Mem0Client + + mem_client = Mem0Client(enable_graph="graph" in args.lib) + + if args.mode == "add": + print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") + print(f"Adding {args.add_turn} irrelevant turns.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit( + add_memory_for_line, + (i, line), + mem_client, + args.add_turn, + args.lib, + args.version, + ) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Adding memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'add' mode complete! Data with user_id written to '{args.output}'.") + + elif args.mode == "search": + print(f"Running in 'search' mode. Searching memories based on '{args.input}'...") + print(f"Retrieving top {args.top_k} memories for each query.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(search_memory_for_line, (i, line), mem_client, args.top_k) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Searching memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print( + f"\n'search' mode complete! Results with retrieved memories written to '{args.output}'." + ) + + elif args.mode == "response": + print(f"Running in 'response' mode. Generating responses based on '{args.input}'...") + print(f"Using {args.max_workers} workers.") + openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL) + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(generate_response_for_line, (i, line), openai_client) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Generating responses...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'response' mode complete! Final results written to '{args.output}'.") + + +if __name__ == "__main__": + main() diff --git a/evaluation/scripts/PrefEval/pref_memobase.py b/evaluation/scripts/PrefEval/pref_memobase.py new file mode 100644 index 000000000..34d3ea86f --- /dev/null +++ b/evaluation/scripts/PrefEval/pref_memobase.py @@ -0,0 +1,306 @@ +import argparse +import concurrent.futures +import json +import os +import sys +import time +import tiktoken +from dotenv import load_dotenv +from openai import OpenAI +from tqdm import tqdm +import time +from irrelevant_conv import irre_10, irre_300 + +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) +load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +BASE_URL = os.getenv("OPENAI_BASE_URL") +MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") +tokenizer = tiktoken.get_encoding("cl100k_base") + + +def add_memory_for_line( + line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str +) -> dict: + """ + Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. + """ + i, line = line_data + user_id = f"{lib}_user_pref_eval_{i}_{version}" + mem_client.delete_user(user_id) + user_id = mem_client.client.add_user({"user_id": user_id}) + print("user_id:", user_id) + try: + original_data = json.loads(line) + conversation = original_data.get("conversation", []) + + if num_irrelevant_turns == 10: + conversation = conversation + irre_10 + elif num_irrelevant_turns == 300: + conversation = conversation + irre_300 + + start_time_add = time.monotonic() + if conversation: + messages = [] + + for chunk_start in range(0, len(conversation)): + chunk = conversation[chunk_start : chunk_start + 1] + timestamp_add = str(int(time.time() * 100)) + time.sleep(0.001) # Ensure unique timestamp + + messages.append( + { + "role": chunk[0]["role"], + "content": chunk[0]["content"][:8000], + "created_at": timestamp_add, + } + ) + mem_client.add(messages=messages, user_id=user_id) + + end_time_add = time.monotonic() + add_duration = end_time_add - start_time_add + + original_data["user_id"] = user_id + original_data["metrics"] = {"add_memories_duration_seconds": add_duration} + return original_data + + except Exception as e: + print(f"Error adding memory for line {i + 1} (user_id: {user_id}): {e}") + return None + + +def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: + """ + Processes a single line of data, searching memory based on the question. + """ + i, line = line_data + try: + original_data = json.loads(line) + + user_id = original_data.get("user_id") + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + + if not user_id: + original_data["error"] = ( + "Error: user_id not found in this line. Please run 'add' mode first." + ) + return original_data + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + start_time_search = time.monotonic() + relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=top_k_value) + search_memories_duration = time.monotonic() - start_time_search + memories_str = relevant_memories + + memory_tokens_used = len(tokenizer.encode(memories_str)) + + metrics_dict.update( + { + "search_memories_duration_seconds": search_memories_duration, + "memory_tokens_used": memory_tokens_used, + "retrieved_memories_text": memories_str, + } + ) + original_data["metrics"] = metrics_dict + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error searching memory for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def generate_response_for_line(line_data: tuple, openai_client: OpenAI) -> dict: + """ + Generates a response for a single line of data using pre-fetched memories. + """ + i, line = line_data + try: + original_data = json.loads(line) + + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + memories_str = metrics_dict.get("retrieved_memories_text") + + # If an error occurred in 'add' or 'search' mode, just pass the line through + if original_data.get("error"): + return original_data + + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + # Check for None, as an empty string (no memories found) is a valid result + if memories_str is None: + original_data["error"] = ( + "Error: retrieved_memories_text not found in metrics. " + "Please run 'search' mode first." + ) + return original_data + + system_prompt = f"You are a helpful AI. Answer the question based on the query and the following memories:\nUser Memories:\n{memories_str}" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": question}, + ] + + response = openai_client.chat.completions.create(model=MODEL_NAME, messages=messages) + assistant_response = response.choices[0].message.content + original_data["response"] = assistant_response + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error generating response for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Process conversations with MemOS. Run 'add', then 'search', then 'response'." + ) + parser.add_argument( + "mode", + choices=["add", "search", "response"], + help="The mode to run the script in ('add', 'search', or 'response').", + ) + parser.add_argument("--input", required=True, help="Path to the input JSONL file.") + parser.add_argument("--output", required=True, help="Path to the output JSONL file.") + parser.add_argument( + "--top-k", + type=int, + default=10, + help="Number of memories to retrieve (used in 'search' mode).", + ) + parser.add_argument( + "--add-turn", + type=int, + choices=[0, 10, 300], + default=0, + help="Number of irrelevant turns to add (used in 'add' mode).", + ) + parser.add_argument( + "--lib", + type=str, + choices=["memobase"], + default="memobase", + help="Which Memobase library to use (used in 'add' mode).", + ) + parser.add_argument( + "--version", + type=str, + default="0929-1", + help="Version identifier for user_id generation (used in 'add' mode).", + ) + parser.add_argument( + "--max-workers", type=int, default=20, help="Maximum number of concurrent workers." + ) + + args = parser.parse_args() + + try: + with open(args.input, "r", encoding="utf-8") as infile: + lines = infile.readlines() + except FileNotFoundError: + print(f"Error: Input file '{args.input}' not found") + return + + from utils.client import MemobaseClient + + mem_client = MemobaseClient() + + if args.mode == "add": + print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") + print(f"Adding {args.add_turn} irrelevant turns.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit( + add_memory_for_line, + (i, line), + mem_client, + args.add_turn, + args.lib, + args.version, + ) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Adding memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'add' mode complete! Data with user_id written to '{args.output}'.") + + elif args.mode == "search": + print(f"Running in 'search' mode. Searching memories based on '{args.input}'...") + print(f"Retrieving top {args.top_k} memories for each query.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(search_memory_for_line, (i, line), mem_client, args.top_k) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Searching memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print( + f"\n'search' mode complete! Results with retrieved memories written to '{args.output}'." + ) + + elif args.mode == "response": + print(f"Running in 'response' mode. Generating responses based on '{args.input}'...") + print(f"Using {args.max_workers} workers.") + openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL) + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(generate_response_for_line, (i, line), openai_client) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Generating responses...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'response' mode complete! Final results written to '{args.output}'.") + + +if __name__ == "__main__": + main() diff --git a/evaluation/scripts/PrefEval/pref_memos.py b/evaluation/scripts/PrefEval/pref_memos.py index d1a901dd2..5ee064b1f 100644 --- a/evaluation/scripts/PrefEval/pref_memos.py +++ b/evaluation/scripts/PrefEval/pref_memos.py @@ -64,11 +64,9 @@ def add_memory_for_line( return None -def process_line_with_id( - line_data: tuple, mem_client, openai_client: OpenAI, top_k_value: int, lib: str, version: str -) -> dict: +def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: """ - Processes a single line of data using a pre-existing user_id, searching memory and generating a response. + Processes a single line of data, searching memory based on the question. """ i, line = line_data try: @@ -79,12 +77,12 @@ def process_line_with_id( metrics_dict = original_data.get("metrics", {}) if not user_id: - original_data["response"] = ( + original_data["error"] = ( "Error: user_id not found in this line. Please run 'add' mode first." ) return original_data if not question: - original_data["response"] = "Question not found in this line." + original_data["error"] = "Question not found in this line." return original_data start_time_search = time.monotonic() @@ -96,6 +94,51 @@ def process_line_with_id( memory_tokens_used = len(tokenizer.encode(memories_str)) + metrics_dict.update( + { + "search_memories_duration_seconds": search_memories_duration, + "memory_tokens_used": memory_tokens_used, + "retrieved_memories_text": memories_str, + } + ) + original_data["metrics"] = metrics_dict + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error searching memory for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def generate_response_for_line(line_data: tuple, openai_client: OpenAI) -> dict: + """ + Generates a response for a single line of data using pre-fetched memories. + """ + i, line = line_data + try: + original_data = json.loads(line) + + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + memories_str = metrics_dict.get("retrieved_memories_text") + + # If an error occurred in 'add' or 'search' mode, just pass the line through + if original_data.get("error"): + return original_data + + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + # Check for None, as an empty string (no memories found) is a valid result + if memories_str is None: + original_data["error"] = ( + "Error: retrieved_memories_text not found in metrics. " + "Please run 'search' mode first." + ) + return original_data + system_prompt = f"You are a helpful AI. Answer the question based on the query and the following memories:\nUser Memories:\n{memories_str}" messages = [ {"role": "system", "content": system_prompt}, @@ -106,51 +149,50 @@ def process_line_with_id( assistant_response = response.choices[0].message.content original_data["response"] = assistant_response - metrics_dict.update( - { - "search_memories_duration_seconds": search_memories_duration, - "memory_tokens_used": memory_tokens_used, - "retrieved_memories_text": memories_str, - } - ) - original_data["metrics"] = metrics_dict - return original_data except Exception as e: user_id_from_data = json.loads(line).get("user_id", "N/A") - print(f"Error processing line {i + 1} (user_id: {user_id_from_data}): {e}") + print(f"Error generating response for line {i + 1} (user_id: {user_id_from_data}): {e}") return None def main(): parser = argparse.ArgumentParser( - description="Process conversations with MemOS. Run 'add' mode first, then 'process' mode." + description="Process conversations with MemOS. Run 'add', then 'search', then 'response'." ) parser.add_argument( "mode", - choices=["add", "process"], - help="The mode to run the script in ('add' or 'process').", + choices=["add", "search", "response"], + help="The mode to run the script in ('add', 'search', or 'response').", ) parser.add_argument("--input", required=True, help="Path to the input JSONL file.") parser.add_argument("--output", required=True, help="Path to the output JSONL file.") - parser.add_argument("--top-k", type=int, default=10, help="Number of memories to retrieve.") + parser.add_argument( + "--top-k", + type=int, + default=10, + help="Number of memories to retrieve (used in 'search' mode).", + ) parser.add_argument( "--add-turn", type=int, choices=[0, 10, 300], default=0, - help="Number of irrelevant turns to add (0, 10, or 300).", + help="Number of irrelevant turns to add (used in 'add' mode).", ) parser.add_argument( "--lib", type=str, choices=["memos-api", "memos-local"], default="memos-api", - help="Which MemOS library to use.", + help="Which MemOS library to use (used in 'add' mode).", ) parser.add_argument( - "--version", type=str, default="0929-1", help="Version identifier for user_id generation." + "--version", + type=str, + default="0929-1", + help="Version identifier for user_id generation (used in 'add' mode).", ) parser.add_argument( "--max-workers", type=int, default=20, help="Maximum number of concurrent workers." @@ -165,9 +207,9 @@ def main(): print(f"Error: Input file '{args.input}' not found") return - from utils.client import memosApiClient + from utils.client import MemosApiClient - mem_client = memosApiClient() + mem_client = MemosApiClient() if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") @@ -200,38 +242,55 @@ def main(): outfile.write(json.dumps(result, ensure_ascii=False) + "\n") print(f"\n'add' mode complete! Data with user_id written to '{args.output}'.") - elif args.mode == "process": - print(f"Running in 'process' mode. Processing questions from '{args.input}'...") + elif args.mode == "search": + print(f"Running in 'search' mode. Searching memories based on '{args.input}'...") print(f"Retrieving top {args.top_k} memories for each query.") print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(search_memory_for_line, (i, line), mem_client, args.top_k) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Searching memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print( + f"\n'search' mode complete! Results with retrieved memories written to '{args.output}'." + ) + + elif args.mode == "response": + print(f"Running in 'response' mode. Generating responses based on '{args.input}'...") + print(f"Using {args.max_workers} workers.") openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL) with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, ): futures = [ - executor.submit( - process_line_with_id, - (i, line), - mem_client, - openai_client, - args.top_k, - args.lib, - args.version, - ) + executor.submit(generate_response_for_line, (i, line), openai_client) for i, line in enumerate(lines) ] pbar = tqdm( concurrent.futures.as_completed(futures), total=len(lines), - desc="Processing questions...", + desc="Generating responses...", ) for future in pbar: result = future.result() if result: outfile.write(json.dumps(result, ensure_ascii=False) + "\n") - print(f"\n'process' mode complete! Final results written to '{args.output}'.") + print(f"\n'response' mode complete! Final results written to '{args.output}'.") if __name__ == "__main__": diff --git a/evaluation/scripts/PrefEval/pref_memu.py b/evaluation/scripts/PrefEval/pref_memu.py new file mode 100644 index 000000000..719f2b488 --- /dev/null +++ b/evaluation/scripts/PrefEval/pref_memu.py @@ -0,0 +1,301 @@ +import argparse +import concurrent.futures +import json +import os +import sys +import time +import tiktoken +from dotenv import load_dotenv +from openai import OpenAI +from tqdm import tqdm +from datetime import datetime +from irrelevant_conv import irre_10, irre_300 + +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) +load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +BASE_URL = os.getenv("OPENAI_BASE_URL") +MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") +tokenizer = tiktoken.get_encoding("cl100k_base") + + +def add_memory_for_line( + line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str +) -> dict: + """ + Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. + """ + i, line = line_data + user_id = f"{lib}_user_pref_eval_{i}_{version}" + + try: + original_data = json.loads(line) + conversation = original_data.get("conversation", []) + + if num_irrelevant_turns == 10: + conversation = conversation + irre_10 + elif num_irrelevant_turns == 300: + conversation = conversation + irre_300 + + turns_add = 5 + start_time_add = time.monotonic() + if conversation: + if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": + for chunk_start in range(0, len(conversation), turns_add * 2): + chunk = conversation[chunk_start : chunk_start + turns_add * 2] + mem_client.add( + messages=chunk, user_id=user_id, iso_date=datetime.now().isoformat() + ) + else: + mem_client.add( + messages=conversation, user_id=user_id, iso_date=datetime.now().isoformat() + ) + end_time_add = time.monotonic() + add_duration = end_time_add - start_time_add + + original_data["user_id"] = user_id + original_data["metrics"] = {"add_memories_duration_seconds": add_duration} + return original_data + + except Exception as e: + print(f"Error adding memory for line {i + 1} (user_id: {user_id}): {e}") + return None + + +def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: + """ + Processes a single line of data, searching memory based on the question. + """ + i, line = line_data + try: + original_data = json.loads(line) + + user_id = original_data.get("user_id") + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + + if not user_id: + original_data["error"] = ( + "Error: user_id not found in this line. Please run 'add' mode first." + ) + return original_data + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + start_time_search = time.monotonic() + relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=top_k_value) + search_memories_duration = time.monotonic() - start_time_search + memories_str = "\n".join( + f"- {entry.get('memory', '')}" for entry in relevant_memories["text_mem"][0]["memories"] + ) + + memory_tokens_used = len(tokenizer.encode(memories_str)) + + metrics_dict.update( + { + "search_memories_duration_seconds": search_memories_duration, + "memory_tokens_used": memory_tokens_used, + "retrieved_memories_text": memories_str, + } + ) + original_data["metrics"] = metrics_dict + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error searching memory for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def generate_response_for_line(line_data: tuple, openai_client: OpenAI) -> dict: + """ + Generates a response for a single line of data using pre-fetched memories. + """ + i, line = line_data + try: + original_data = json.loads(line) + + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + memories_str = metrics_dict.get("retrieved_memories_text") + + # If an error occurred in 'add' or 'search' mode, just pass the line through + if original_data.get("error"): + return original_data + + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + # Check for None, as an empty string (no memories found) is a valid result + if memories_str is None: + original_data["error"] = ( + "Error: retrieved_memories_text not found in metrics. " + "Please run 'search' mode first." + ) + return original_data + + system_prompt = f"You are a helpful AI. Answer the question based on the query and the following memories:\nUser Memories:\n{memories_str}" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": question}, + ] + + response = openai_client.chat.completions.create(model=MODEL_NAME, messages=messages) + assistant_response = response.choices[0].message.content + original_data["response"] = assistant_response + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error generating response for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Process conversations with MemOS. Run 'add', then 'search', then 'response'." + ) + parser.add_argument( + "mode", + choices=["add", "search", "response"], + help="The mode to run the script in ('add', 'search', or 'response').", + ) + parser.add_argument("--input", required=True, help="Path to the input JSONL file.") + parser.add_argument("--output", required=True, help="Path to the output JSONL file.") + parser.add_argument( + "--top-k", + type=int, + default=10, + help="Number of memories to retrieve (used in 'search' mode).", + ) + parser.add_argument( + "--add-turn", + type=int, + choices=[0, 10, 300], + default=0, + help="Number of irrelevant turns to add (used in 'add' mode).", + ) + parser.add_argument( + "--lib", + type=str, + choices=["memu"], + default="memu", + help="Which Memu library to use (used in 'add' mode).", + ) + parser.add_argument( + "--version", + type=str, + default="0929-1", + help="Version identifier for user_id generation (used in 'add' mode).", + ) + parser.add_argument( + "--max-workers", type=int, default=20, help="Maximum number of concurrent workers." + ) + + args = parser.parse_args() + + try: + with open(args.input, "r", encoding="utf-8") as infile: + lines = infile.readlines() + except FileNotFoundError: + print(f"Error: Input file '{args.input}' not found") + return + + from utils.client import MemuClient + + mem_client = MemuClient() + + if args.mode == "add": + print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") + print(f"Adding {args.add_turn} irrelevant turns.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit( + add_memory_for_line, + (i, line), + mem_client, + args.add_turn, + args.lib, + args.version, + ) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Adding memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'add' mode complete! Data with user_id written to '{args.output}'.") + + elif args.mode == "search": + print(f"Running in 'search' mode. Searching memories based on '{args.input}'...") + print(f"Retrieving top {args.top_k} memories for each query.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(search_memory_for_line, (i, line), mem_client, args.top_k) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Searching memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print( + f"\n'search' mode complete! Results with retrieved memories written to '{args.output}'." + ) + + elif args.mode == "response": + print(f"Running in 'response' mode. Generating responses based on '{args.input}'...") + print(f"Using {args.max_workers} workers.") + openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL) + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(generate_response_for_line, (i, line), openai_client) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Generating responses...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'response' mode complete! Final results written to '{args.output}'.") + + +if __name__ == "__main__": + main() diff --git a/evaluation/scripts/PrefEval/pref_supermemory.py b/evaluation/scripts/PrefEval/pref_supermemory.py new file mode 100644 index 000000000..85e84b6c9 --- /dev/null +++ b/evaluation/scripts/PrefEval/pref_supermemory.py @@ -0,0 +1,334 @@ +import argparse +import concurrent.futures +import json +import os +import sys +import time +import tiktoken +from dotenv import load_dotenv +from openai import OpenAI +from tqdm import tqdm +from datetime import datetime +from irrelevant_conv import irre_10, irre_300 + +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) +load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +BASE_URL = os.getenv("OPENAI_BASE_URL") +MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") +tokenizer = tiktoken.get_encoding("cl100k_base") + + +def add_memory_for_line( + line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str +) -> dict: + """ + Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. + """ + i, line = line_data + user_id = f"{lib}_user_pref_eval_{i}_{version}" + + try: + original_data = json.loads(line) + conversation = original_data.get("conversation", []) + + if num_irrelevant_turns == 10: + conversation = conversation + irre_10 + elif num_irrelevant_turns == 300: + conversation = conversation + irre_300 + + turns_add = 5 + start_time_add = time.monotonic() + if conversation: + if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": + for chunk_start in range(0, len(conversation), turns_add * 2): + chunk = conversation[chunk_start : chunk_start + turns_add * 2] + mem_client.add(messages=chunk, user_id=user_id) + else: + mem_client.add(messages=conversation, user_id=user_id) + end_time_add = time.monotonic() + add_duration = end_time_add - start_time_add + + original_data["user_id"] = user_id + original_data["metrics"] = {"add_memories_duration_seconds": add_duration} + return original_data + + except Exception as e: + print(f"Error adding memory for line {i + 1} (user_id: {user_id}): {e}") + return None + + +def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: + """ + Processes a single line of data, searching memory based on the question. + """ + i, line = line_data + try: + original_data = json.loads(line) + + user_id = original_data.get("user_id") + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + + if not user_id: + original_data["error"] = ( + "Error: user_id not found in this line. Please run 'add' mode first." + ) + return original_data + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + start_time_search = time.monotonic() + relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=top_k_value) + search_memories_duration = time.monotonic() - start_time_search + memories_str = "\n".join( + f"- {entry.get('memory', '')}" for entry in relevant_memories["text_mem"][0]["memories"] + ) + + memory_tokens_used = len(tokenizer.encode(memories_str)) + + metrics_dict.update( + { + "search_memories_duration_seconds": search_memories_duration, + "memory_tokens_used": memory_tokens_used, + "retrieved_memories_text": memories_str, + } + ) + original_data["metrics"] = metrics_dict + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error searching memory for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def generate_response_for_line(line_data: tuple, openai_client: OpenAI) -> dict: + """ + Generates a response for a single line of data using pre-fetched memories. + """ + i, line = line_data + try: + original_data = json.loads(line) + + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + memories_str = metrics_dict.get("retrieved_memories_text") + + # If an error occurred in 'add' or 'search' mode, just pass the line through + if original_data.get("error"): + return original_data + + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + # Check for None, as an empty string (no memories found) is a valid result + if memories_str is None: + original_data["error"] = ( + "Error: retrieved_memories_text not found in metrics. " + "Please run 'search' mode first." + ) + return original_data + + system_prompt = f"You are a helpful AI. Answer the question based on the query and the following memories:\nUser Memories:\n{memories_str}" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": question}, + ] + + response = openai_client.chat.completions.create(model=MODEL_NAME, messages=messages) + assistant_response = response.choices[0].message.content + original_data["response"] = assistant_response + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error generating response for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Process conversations with MemOS. Run 'add', then 'search', then 'response'." + ) + parser.add_argument( + "mode", + choices=["add", "search", "response"], + help="The mode to run the script in ('add', 'search', or 'response').", + ) + parser.add_argument("--input", required=True, help="Path to the input JSONL file.") + parser.add_argument("--output", required=True, help="Path to the output JSONL file.") + parser.add_argument( + "--top-k", + type=int, + default=10, + help="Number of memories to retrieve (used in 'search' mode).", + ) + parser.add_argument( + "--add-turn", + type=int, + choices=[0, 10, 300], + default=0, + help="Number of irrelevant turns to add (used in 'add' mode).", + ) + parser.add_argument( + "--lib", + type=str, + choices=["supermemory"], + default="supermemory", + help="Which Supermemory library to use (used in 'add' mode).", + ) + parser.add_argument( + "--version", + type=str, + default="0929-1", + help="Version identifier for user_id generation (used in 'add' mode).", + ) + parser.add_argument( + "--max-workers", type=int, default=20, help="Maximum number of concurrent workers." + ) + + args = parser.parse_args() + + try: + with open(args.input, "r", encoding="utf-8") as infile: + lines = infile.readlines() + except FileNotFoundError: + print(f"Error: Input file '{args.input}' not found") + return + + class SupermemoryClient: + def __init__(self): + from supermemory import Supermemory + + self.client = Supermemory(api_key=os.getenv("SUPERMEMORY_API_KEY")) + + def add(self, messages, user_id): + content = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages]) + max_retries = 5 + for attempt in range(max_retries): + try: + self.client.memories.add(content=content, container_tag=user_id) + break + except Exception as e: + if attempt < max_retries - 1: + time.sleep(2**attempt) + else: + raise e + + def search(self, query, user_id, top_k): + max_retries = 10 + for attempt in range(max_retries): + try: + results = self.client.search.memories( + q=query, + container_tag=user_id, + threshold=0, + rerank=True, + rewrite_query=True, + limit=top_k, + ) + context = "\n\n".join([r.memory for r in results.results]) + return context + except Exception as e: + if attempt < max_retries - 1: + time.sleep(2**attempt) + else: + raise e + + mem_client = SupermemoryClient() + + if args.mode == "add": + print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") + print(f"Adding {args.add_turn} irrelevant turns.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit( + add_memory_for_line, + (i, line), + mem_client, + args.add_turn, + args.lib, + args.version, + ) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Adding memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'add' mode complete! Data with user_id written to '{args.output}'.") + + elif args.mode == "search": + print(f"Running in 'search' mode. Searching memories based on '{args.input}'...") + print(f"Retrieving top {args.top_k} memories for each query.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(search_memory_for_line, (i, line), mem_client, args.top_k) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Searching memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print( + f"\n'search' mode complete! Results with retrieved memories written to '{args.output}'." + ) + + elif args.mode == "response": + print(f"Running in 'response' mode. Generating responses based on '{args.input}'...") + print(f"Using {args.max_workers} workers.") + openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL) + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(generate_response_for_line, (i, line), openai_client) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Generating responses...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'response' mode complete! Final results written to '{args.output}'.") + + +if __name__ == "__main__": + main() diff --git a/evaluation/scripts/PrefEval/pref_zep.py b/evaluation/scripts/PrefEval/pref_zep.py new file mode 100644 index 000000000..699660787 --- /dev/null +++ b/evaluation/scripts/PrefEval/pref_zep.py @@ -0,0 +1,307 @@ +import argparse +import concurrent.futures +import json +import os +import sys +import time +import tiktoken +from dotenv import load_dotenv +from openai import OpenAI +from tqdm import tqdm +from datetime import datetime +from irrelevant_conv import irre_10, irre_300 + +ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) +EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts") + +sys.path.insert(0, ROOT_DIR) +sys.path.insert(0, EVAL_SCRIPTS_DIR) +load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +BASE_URL = os.getenv("OPENAI_BASE_URL") +MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") +tokenizer = tiktoken.get_encoding("cl100k_base") + + +def add_memory_for_line( + line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str +) -> dict: + """ + Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. + """ + i, line = line_data + user_id = f"{lib}_user_pref_eval_{i}_{version}" + + try: + original_data = json.loads(line) + conversation = original_data.get("conversation", []) + + if num_irrelevant_turns == 10: + conversation = conversation + irre_10 + elif num_irrelevant_turns == 300: + conversation = conversation + irre_300 + + turns_add = 5 + start_time_add = time.monotonic() + if conversation: + if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": + for chunk_start in range(0, len(conversation), turns_add * 2): + chunk = conversation[chunk_start : chunk_start + turns_add * 2] + mem_client.add( + messages=chunk, + user_id=user_id, + conv_id=None, + timestamp=datetime.now().isoformat(), + ) + else: + mem_client.add( + messages=conversation, + user_id=user_id, + conv_id=None, + timestamp=datetime.now().isoformat(), + ) + end_time_add = time.monotonic() + add_duration = end_time_add - start_time_add + + original_data["user_id"] = user_id + original_data["metrics"] = {"add_memories_duration_seconds": add_duration} + return original_data + + except Exception as e: + print(f"Error adding memory for line {i + 1} (user_id: {user_id}): {e}") + return None + + +def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: + """ + Processes a single line of data, searching memory based on the question. + """ + i, line = line_data + try: + original_data = json.loads(line) + + user_id = original_data.get("user_id") + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + + if not user_id: + original_data["error"] = ( + "Error: user_id not found in this line. Please run 'add' mode first." + ) + return original_data + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + start_time_search = time.monotonic() + relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=top_k_value) + search_memories_duration = time.monotonic() - start_time_search + memories_str = "\n".join( + f"- {entry.get('memory', '')}" for entry in relevant_memories["text_mem"][0]["memories"] + ) + + memory_tokens_used = len(tokenizer.encode(memories_str)) + + metrics_dict.update( + { + "search_memories_duration_seconds": search_memories_duration, + "memory_tokens_used": memory_tokens_used, + "retrieved_memories_text": memories_str, + } + ) + original_data["metrics"] = metrics_dict + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error searching memory for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def generate_response_for_line(line_data: tuple, openai_client: OpenAI) -> dict: + """ + Generates a response for a single line of data using pre-fetched memories. + """ + i, line = line_data + try: + original_data = json.loads(line) + + question = original_data.get("question") + metrics_dict = original_data.get("metrics", {}) + memories_str = metrics_dict.get("retrieved_memories_text") + + # If an error occurred in 'add' or 'search' mode, just pass the line through + if original_data.get("error"): + return original_data + + if not question: + original_data["error"] = "Question not found in this line." + return original_data + + # Check for None, as an empty string (no memories found) is a valid result + if memories_str is None: + original_data["error"] = ( + "Error: retrieved_memories_text not found in metrics. " + "Please run 'search' mode first." + ) + return original_data + + system_prompt = f"You are a helpful AI. Answer the question based on the query and the following memories:\nUser Memories:\n{memories_str}" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": question}, + ] + + response = openai_client.chat.completions.create(model=MODEL_NAME, messages=messages) + assistant_response = response.choices[0].message.content + original_data["response"] = assistant_response + + return original_data + + except Exception as e: + user_id_from_data = json.loads(line).get("user_id", "N/A") + print(f"Error generating response for line {i + 1} (user_id: {user_id_from_data}): {e}") + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Process conversations with MemOS. Run 'add', then 'search', then 'response'." + ) + parser.add_argument( + "mode", + choices=["add", "search", "response"], + help="The mode to run the script in ('add', 'search', or 'response').", + ) + parser.add_argument("--input", required=True, help="Path to the input JSONL file.") + parser.add_argument("--output", required=True, help="Path to the output JSONL file.") + parser.add_argument( + "--top-k", + type=int, + default=10, + help="Number of memories to retrieve (used in 'search' mode).", + ) + parser.add_argument( + "--add-turn", + type=int, + choices=[0, 10, 300], + default=0, + help="Number of irrelevant turns to add (used in 'add' mode).", + ) + parser.add_argument( + "--lib", + type=str, + choices=["zep"], + default="zep", + help="Which Zep library to use (used in 'add' mode).", + ) + parser.add_argument( + "--version", + type=str, + default="0929-1", + help="Version identifier for user_id generation (used in 'add' mode).", + ) + parser.add_argument( + "--max-workers", type=int, default=20, help="Maximum number of concurrent workers." + ) + + args = parser.parse_args() + + try: + with open(args.input, "r", encoding="utf-8") as infile: + lines = infile.readlines() + except FileNotFoundError: + print(f"Error: Input file '{args.input}' not found") + return + + from utils.client import ZepClient + + mem_client = ZepClient() + + if args.mode == "add": + print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") + print(f"Adding {args.add_turn} irrelevant turns.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit( + add_memory_for_line, + (i, line), + mem_client, + args.add_turn, + args.lib, + args.version, + ) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Adding memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'add' mode complete! Data with user_id written to '{args.output}'.") + + elif args.mode == "search": + print(f"Running in 'search' mode. Searching memories based on '{args.input}'...") + print(f"Retrieving top {args.top_k} memories for each query.") + print(f"Using {args.max_workers} workers.") + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(search_memory_for_line, (i, line), mem_client, args.top_k) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Searching memories...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print( + f"\n'search' mode complete! Results with retrieved memories written to '{args.output}'." + ) + + elif args.mode == "response": + print(f"Running in 'response' mode. Generating responses based on '{args.input}'...") + print(f"Using {args.max_workers} workers.") + openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL) + with ( + open(args.output, "w", encoding="utf-8") as outfile, + concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + ): + futures = [ + executor.submit(generate_response_for_line, (i, line), openai_client) + for i, line in enumerate(lines) + ] + + pbar = tqdm( + concurrent.futures.as_completed(futures), + total=len(lines), + desc="Generating responses...", + ) + for future in pbar: + result = future.result() + if result: + outfile.write(json.dumps(result, ensure_ascii=False) + "\n") + print(f"\n'response' mode complete! Final results written to '{args.output}'.") + + +if __name__ == "__main__": + main() diff --git a/evaluation/scripts/locomo/locomo_ingestion.py b/evaluation/scripts/locomo/locomo_ingestion.py index 2a177a52a..edb451dc0 100644 --- a/evaluation/scripts/locomo/locomo_ingestion.py +++ b/evaluation/scripts/locomo/locomo_ingestion.py @@ -103,12 +103,8 @@ def process_user(conv_idx, frame, locomo_df, version): from utils.client import MemobaseClient client = MemobaseClient() - all_users = client.client.get_all_users(limit=5000) - for user in all_users: - if user["additional_fields"]["user_id"] in [speaker_a_user_id, speaker_b_user_id]: - client.client.delete_user(user["id"]) - speaker_a_user_id = client.client.add_user({"user_id": speaker_a_user_id}) - speaker_b_user_id = client.client.add_user({"user_id": speaker_b_user_id}) + client.delete_user(speaker_a_user_id) + client.delete_user(speaker_b_user_id) elif frame == "memu": from utils.client import MemuClient @@ -193,7 +189,7 @@ def main(frame, version="default", num_workers=4): parser.add_argument( "--version", type=str, - default="default1", + default="default", help="Version identifier for saving results (e.g., 1010)", ) parser.add_argument( diff --git a/evaluation/scripts/locomo/locomo_search.py b/evaluation/scripts/locomo/locomo_search.py index d976b8f67..452fb4762 100644 --- a/evaluation/scripts/locomo/locomo_search.py +++ b/evaluation/scripts/locomo/locomo_search.py @@ -254,12 +254,6 @@ def process_user(conv_idx, locomo_df, frame, version, top_k=20, num_workers=1): from utils.client import MemobaseClient client = MemobaseClient() - users = client.client.get_all_users(limit=5000) - for u in users: - if u["additional_fields"]["user_id"] == speaker_a_user_id: - speaker_a_user_id = u["id"] - if u["additional_fields"]["user_id"] == speaker_b_user_id: - speaker_b_user_id = u["id"] elif frame == "memu": from utils.client import MemuClient @@ -348,7 +342,7 @@ def main(frame, version="default", num_workers=1, top_k=20): "--workers", type=int, default=5, help="Number of parallel workers to process users" ) parser.add_argument( - "--top_k", type=int, default=20, help="Number of results to retrieve in search queries" + "--top_k", type=int, default=15, help="Number of results to retrieve in search queries" ) args = parser.parse_args() lib = args.lib diff --git a/evaluation/scripts/longmemeval/lme_ingestion.py b/evaluation/scripts/longmemeval/lme_ingestion.py index 6e9bd5ab4..a1849757d 100644 --- a/evaluation/scripts/longmemeval/lme_ingestion.py +++ b/evaluation/scripts/longmemeval/lme_ingestion.py @@ -80,11 +80,7 @@ def ingest_conv(lme_df, version, conv_idx, frame, success_records, f): from utils.client import MemobaseClient client = MemobaseClient() - all_users = client.client.get_all_users(limit=5000) - for user in all_users: - if user["additional_fields"]["user_id"] == user_id: - client.client.delete_user(user["id"]) - user_id = client.client.add_user({"user_id": user_id}) + client.delete_user(user_id) elif frame == "memu": from utils.client import MemuClient diff --git a/evaluation/scripts/longmemeval/lme_search.py b/evaluation/scripts/longmemeval/lme_search.py index a24c0eaf5..67d2f1b04 100644 --- a/evaluation/scripts/longmemeval/lme_search.py +++ b/evaluation/scripts/longmemeval/lme_search.py @@ -114,10 +114,6 @@ def process_user(lme_df, conv_idx, frame, version, top_k=20): from utils.client import MemobaseClient client = MemobaseClient() - users = client.client.get_all_users(limit=5000) - for u in users: - if u["additional_fields"]["user_id"] == user_id: - user_id = u["id"] context, duration_ms = memobase_search(client, question, user_id, top_k) elif frame == "memos-api": from utils.client import MemosApiClient diff --git a/evaluation/scripts/personamem/pm_ingestion.py b/evaluation/scripts/personamem/pm_ingestion.py index 5cd9d38a6..8de23937c 100644 --- a/evaluation/scripts/personamem/pm_ingestion.py +++ b/evaluation/scripts/personamem/pm_ingestion.py @@ -1,48 +1,64 @@ import argparse -import os -import sys import csv import json - -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import os +import sys from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime - from tqdm import tqdm -from utils.client import mem0_client,zep_client,memos_api_client -from zep_cloud.types import Message +import time + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def ingest_session(session, user_id, session_id, frame, client): messages = [] if frame == "zep": pass + elif "mem0" in frame: for idx, msg in enumerate(session): + messages.append({"role": msg["role"], "content": msg["content"][:8000]}) print( - f"[{frame}] πŸ’¬ Session [{session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}...") - client.memory.add(messages=[Message(role=msg["role"], role_type=msg["role"], content=msg["content"], )], ) - elif frame == "mem0-local" or frame == "mem0-api": - for idx, msg in enumerate(session): - messages.append({"role": msg["role"], "content": msg["content"]}) - print( - f"[{frame}] πŸ“ Session [{session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}...") - if frame == "mem0-local": - client.add(messages=messages, user_id=user_id) - elif frame == "mem0-api": - client.add(messages=messages, - user_id=user_id, - session_id=session_id, - version="v2", ) + f"[{frame}] πŸ“ Session [{session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}..." + ) + timestamp_add = int(time.time() * 100) + client.add(messages=messages, user_id=user_id, timestamp=timestamp_add) print(f"[{frame}] βœ… Session [{session_id}]: Ingested {len(messages)} messages") - elif frame == "memos-local" or frame == "memos-api": - if os.getenv("PRE_SPLIT_CHUNK")=="true": + elif frame == "memos-api": + if os.getenv("PRE_SPLIT_CHUNK") == "true": for i in range(0, len(session), 10): - messages = session[i: i + 10] + messages = session[i : i + 10] client.add(messages=messages, user_id=user_id, conv_id=session_id) print(f"[{frame}] βœ… Session [{session_id}]: Ingested {len(messages)} messages") else: client.add(messages=session, user_id=user_id, conv_id=session_id) print(f"[{frame}] βœ… Session [{session_id}]: Ingested {len(session)} messages") + elif frame == "memobase": + for idx, msg in enumerate(session): + if msg["role"] != "system": + messages.append( + { + "role": msg["role"], + "content": msg["content"][:8000], + "created_at": datetime.now().isoformat(), + } + ) + client.add(messages, user_id) + print(f"[{frame}] βœ… Session [{session_id}]: Ingested {len(messages)} messages") + elif frame == "supermemory": + for _idx, msg in enumerate(session): + messages.append( + { + "role": msg["role"], + "content": msg["content"][:8000], + "chat_time": datetime.now().astimezone().isoformat(), + } + ) + client.add(messages, user_id) + elif frame == "memu": + for _idx, msg in enumerate(session): + messages.append({"role": msg["role"], "content": msg["content"]}) + client.add(messages, user_id, datetime.now().astimezone().isoformat()) def build_jsonl_index(jsonl_path): @@ -51,7 +67,7 @@ def build_jsonl_index(jsonl_path): Assumes each line is a JSON object with a single key-value pair. """ index = {} - with open(jsonl_path, 'r', encoding='utf-8') as f: + with open(jsonl_path, "r", encoding="utf-8") as f: while True: offset = f.tell() line = f.readline() @@ -63,14 +79,14 @@ def build_jsonl_index(jsonl_path): def load_context_by_id(jsonl_path, offset): - with open(jsonl_path, 'r', encoding='utf-8') as f: + with open(jsonl_path, "r", encoding="utf-8") as f: f.seek(offset) item = json.loads(f.readline()) return next(iter(item.values())) def load_rows(csv_path): - with open(csv_path, mode='r', newline='', encoding='utf-8') as csvfile: + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: reader = csv.DictReader(csvfile) for _, row in enumerate(reader, start=1): row_data = {} @@ -82,7 +98,7 @@ def load_rows(csv_path): def load_rows_with_context(csv_path, jsonl_path): jsonl_index = build_jsonl_index(jsonl_path) - with open(csv_path, mode='r', newline='', encoding='utf-8') as csvfile: + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: reader = csv.DictReader(csvfile) prev_sid = None prev_context = None @@ -102,13 +118,13 @@ def load_rows_with_context(csv_path, jsonl_path): def count_csv_rows(csv_path): - with open(csv_path, mode='r', newline='', encoding='utf-8') as f: + with open(csv_path, mode="r", newline="", encoding="utf-8") as f: return sum(1 for _ in f) - 1 def ingest_conv(row_data, context, version, conv_idx, frame): end_index_in_shared_context = row_data["end_index_in_shared_context"] - context = context[:int(end_index_in_shared_context)] + context = context[: int(end_index_in_shared_context)] user_id = f"pm_exper_user_{conv_idx}_{version}" print(f"πŸ‘€ User ID: {user_id}") print("\n" + "=" * 80) @@ -116,38 +132,45 @@ def ingest_conv(row_data, context, version, conv_idx, frame): print("=" * 80) if frame == "zep": - client = zep_client() + from utils.client import ZepClient + + client = ZepClient() print("πŸ”Œ Using Zep client for ingestion...") client.user.delete(user_id) print(f"πŸ—‘οΈ Deleted existing user {user_id} from Zep memory...") client.user.add(user_id=user_id) print(f"βž• Added user {user_id} to Zep memory...") - elif frame == "mem0-local": - client = mem0_client(mode="local") - print("πŸ”Œ Using Mem0 Local client for ingestion...") - client.delete_all(user_id=user_id) + elif frame == "mem0" or frame == "mem0_graph": + from utils.client import Mem0Client + + client = Mem0Client(enable_graph="graph" in frame) + print("πŸ”Œ Using Mem0 client for ingestion...") + client.client.delete_all(user_id=user_id) print(f"πŸ—‘οΈ Deleted existing memories for user {user_id}...") - elif frame == "mem0-api": - client = mem0_client(mode="api") - print("πŸ”Œ Using Mem0 API client for ingestion...") - client.delete_all(user_id=user_id) + print(f"πŸ—‘οΈ Deleted existing memories for user {user_id}...") - elif frame == "memos-local": - client = memos_client( - mode="local", - db_name=f"pm_{frame}-{version}", - user_id=user_id, - top_k=20, - mem_cube_path=f"results/pm/{frame}-{version}/storages/{user_id}", - mem_cube_config_path="configs/mu_mem_cube_config.json", - mem_os_config_path="configs/mos_memos_config.json", - addorsearch="add", - ) - print("πŸ”Œ Using Memos Local client for ingestion...") elif frame == "memos-api": - client = memos_api_client() + from utils.client import MemosApiClient + + client = MemosApiClient() + elif frame == "memobase": + from utils.client import MemobaseClient + + client = MemobaseClient() + print("πŸ”Œ Using Memobase client for ingestion...") + client.delte_user(user_id) + elif frame == "supermemory": + from utils.client import SupermemoryClient - ingest_session(session=context, user_id=user_id, session_id=conv_idx, frame=frame, client=client) + client = SupermemoryClient() + elif frame == "memu": + from utils.client import MemuClient + + client = MemuClient() + + ingest_session( + session=context, user_id=user_id, session_id=conv_idx, frame=frame, client=client + ) print(f"βœ… Ingestion of conversation {conv_idx} completed") print("=" * 80) @@ -170,16 +193,25 @@ def main(frame, version, num_workers=2): with ThreadPoolExecutor(max_workers=num_workers) as executor: future_to_idx = { - executor.submit(ingest_conv, row_data=row_data, context=context, version=version, conv_idx=idx, - frame=frame, ): idx - for idx, (row_data, context) in enumerate(all_data)} - - for future in tqdm(as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations"): + executor.submit( + ingest_conv, + row_data=row_data, + context=context, + version=version, + conv_idx=idx, + frame=frame, + ): idx + for idx, (row_data, context) in enumerate(all_data) + } + + for future in tqdm( + as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + ): idx = future_to_idx[future] try: future.result() except Exception as exc: - print(f'\n❌ Conversation {idx} generated an exception: {exc}') + print(f"\n❌ Conversation {idx} generated an exception: {exc}") end_time = datetime.now() elapsed_time = end_time - start_time @@ -195,10 +227,18 @@ def main(frame, version, num_workers=2): if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Ingestion Script") - parser.add_argument("--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], - default='memos-api') - parser.add_argument("--version", type=str, default="0925-1", help="Version of the evaluation framework.") - parser.add_argument("--workers", type=int, default=3, help="Number of parallel workers for processing users.") + parser.add_argument( + "--lib", + type=str, + choices=["mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory", "zep"], + default="memos-api", + ) + parser.add_argument( + "--version", type=str, default="0925-1", help="Version of the evaluation framework." + ) + parser.add_argument( + "--workers", type=int, default=3, help="Number of parallel workers for processing users." + ) args = parser.parse_args() main(frame=args.lib, version=args.version, num_workers=args.workers) diff --git a/evaluation/scripts/personamem/pm_metric.py b/evaluation/scripts/personamem/pm_metric.py index 0f6a1e138..653c5fc10 100644 --- a/evaluation/scripts/personamem/pm_metric.py +++ b/evaluation/scripts/personamem/pm_metric.py @@ -8,40 +8,48 @@ def save_to_excel(results, output_path): """Save results to Excel file""" combined_data = [] - + # Add overall statistics row - overall_row = {"category": "overall", "accuracy": results["metrics"]["accuracy"], - "accuracy_std": results["metrics"]["accuracy_std"], - "total_questions": results["metrics"]["total_questions"], - "total_runs": results["metrics"]["total_runs"]} + overall_row = { + "category": "overall", + "accuracy": results["metrics"]["accuracy"], + "accuracy_std": results["metrics"]["accuracy_std"], + "total_questions": results["metrics"]["total_questions"], + "total_runs": results["metrics"]["total_runs"], + } # Add response duration metrics for metric, value in results["metrics"]["response_duration"].items(): overall_row[f"response_{metric}"] = value - + # Add search duration metrics (if exists) if "search_duration" in results["metrics"] and results["metrics"]["search_duration"]: for metric, value in results["metrics"]["search_duration"].items(): overall_row[f"search_{metric}"] = value - + combined_data.append(overall_row) - + # Add category statistics rows for category, scores in results["category_scores"].items(): - category_row = {"category": category, "accuracy": scores["accuracy"], "accuracy_std": scores["accuracy_std"], - "total_questions": scores["total_questions"], "total_runs": scores["total_runs"]} + category_row = { + "category": category, + "accuracy": scores["accuracy"], + "accuracy_std": scores["accuracy_std"], + "total_questions": scores["total_questions"], + "total_runs": scores["total_runs"], + } # Add response duration metrics for metric, value in scores["response_duration"].items(): category_row[f"response_{metric}"] = value - + # Add search duration metrics (if exists) if "search_duration" in scores and scores["search_duration"]: for metric, value in scores["search_duration"].items(): category_row[f"search_{metric}"] = value - + combined_data.append(category_row) - + # Save to Excel df = pd.DataFrame(combined_data) df.to_excel(output_path, sheet_name="PersonaMem_Metrics", index=False) @@ -50,62 +58,62 @@ def save_to_excel(results, output_path): def calculate_scores(data, grade_path, output_path): """Calculate PersonaMem evaluation metrics""" - + # Initialize statistics variables category_scores = {} user_metrics = {} - + # Overall metrics - collect accuracy for each run all_response_durations = [] all_search_durations = [] total_questions = 0 - + # For calculating accuracy across multiple runs num_runs = None # Will be determined from first user's data run_accuracies = [] # List to store accuracy for each run across all users - + # Category-wise statistics category_response_durations = {} category_search_durations = {} category_run_accuracies = {} # Store accuracy for each run by category - + print(f"πŸ“‹ Processing response data for {len(data)} users...") - + # First pass: determine number of runs and initialize run accuracy arrays for user_id, user_data in data.items(): # Skip incomplete data (users with only topic field) if len(user_data) <= 2 and "topic" in user_data: continue - + results = user_data.get("results", []) if not results: continue - + if num_runs is None: num_runs = len(results) run_accuracies = [[] for _ in range(num_runs)] # Initialize for each run print(f"πŸ“Š Detected {num_runs} runs per user") break - + if num_runs is None: print("❌ Error: Could not determine number of runs from data") return - + # Iterate through all user data for user_id, user_data in data.items(): # Skip incomplete data (users with only topic field) if len(user_data) <= 2 and "topic" in user_data: print(f"⚠️ Skipping incomplete data for user {user_id}") continue - + # Get category and results category = user_data.get("category", "unknown") results = user_data.get("results", []) - + if not results: print(f"⚠️ No results found for user {user_id}") continue - + # Initialize category if not exists if category not in category_scores: category_scores[category] = { @@ -115,39 +123,39 @@ def calculate_scores(data, grade_path, output_path): "accuracy": 0.0, "accuracy_std": 0.0, "response_duration": {}, - "search_duration": {} + "search_duration": {}, } category_response_durations[category] = [] category_search_durations[category] = [] category_run_accuracies[category] = [[] for _ in range(num_runs)] - + # Process each run for this user user_response_durations = [] for run_idx, result in enumerate(results): is_correct = result.get("is_correct", False) - + # Collect accuracy for each run (1 if correct, 0 if not) if run_idx < num_runs: run_accuracies[run_idx].append(1.0 if is_correct else 0.0) category_run_accuracies[category][run_idx].append(1.0 if is_correct else 0.0) - + # Collect response duration response_duration = result.get("response_duration_ms", 0) if response_duration > 0: user_response_durations.append(response_duration) all_response_durations.append(response_duration) category_response_durations[category].append(response_duration) - + # Get search duration (usually same for all runs) search_duration = user_data.get("search_duration_ms", 0) if search_duration > 0: all_search_durations.append(search_duration) category_search_durations[category].append(search_duration) - + # Calculate user-level accuracy (average across runs) user_correct_count = sum(1 for result in results if result.get("is_correct", False)) user_accuracy = user_correct_count / len(results) if results else 0.0 - + # Store user-level metrics user_metrics[user_id] = { "user_id": user_id, @@ -156,22 +164,26 @@ def calculate_scores(data, grade_path, output_path): "accuracy": user_accuracy, "total_runs": len(results), "correct_runs": user_correct_count, - "avg_response_duration_ms": np.mean(user_response_durations) if user_response_durations else 0.0, + "avg_response_duration_ms": np.mean(user_response_durations) + if user_response_durations + else 0.0, "search_duration_ms": search_duration, "golden_answer": user_data.get("golden_answer", ""), - "topic": user_data.get("topic", "") + "topic": user_data.get("topic", ""), } - + # Count statistics total_questions += 1 category_scores[category]["total_questions"] += 1 category_scores[category]["total_runs"] += len(results) - + # Calculate overall accuracy and std across runs overall_run_accuracies = [np.mean(run_acc) for run_acc in run_accuracies if run_acc] overall_accuracy = np.mean(overall_run_accuracies) if overall_run_accuracies else 0.0 - overall_accuracy_std = np.std(overall_run_accuracies) if len(overall_run_accuracies) > 1 else 0.0 - + overall_accuracy_std = ( + np.std(overall_run_accuracies) if len(overall_run_accuracies) > 1 else 0.0 + ) + # Calculate response duration statistics response_duration_stats = {} if all_response_durations: @@ -182,9 +194,9 @@ def calculate_scores(data, grade_path, output_path): "p95": np.percentile(all_response_durations, 95), "std": np.std(all_response_durations), "min": np.min(all_response_durations), - "max": np.max(all_response_durations) + "max": np.max(all_response_durations), } - + # Calculate search duration statistics search_duration_stats = {} if all_search_durations: @@ -195,16 +207,22 @@ def calculate_scores(data, grade_path, output_path): "p95": np.percentile(all_search_durations, 95), "std": np.std(all_search_durations), "min": np.min(all_search_durations), - "max": np.max(all_search_durations) + "max": np.max(all_search_durations), } - + # Calculate category-wise metrics for category in category_scores: # Calculate accuracy mean and std across runs for this category - cat_run_accuracies = [np.mean(run_acc) for run_acc in category_run_accuracies[category] if run_acc] - category_scores[category]["accuracy"] = np.mean(cat_run_accuracies) if cat_run_accuracies else 0.0 - category_scores[category]["accuracy_std"] = np.std(cat_run_accuracies) if len(cat_run_accuracies) > 1 else 0.0 - + cat_run_accuracies = [ + np.mean(run_acc) for run_acc in category_run_accuracies[category] if run_acc + ] + category_scores[category]["accuracy"] = ( + np.mean(cat_run_accuracies) if cat_run_accuracies else 0.0 + ) + category_scores[category]["accuracy_std"] = ( + np.std(cat_run_accuracies) if len(cat_run_accuracies) > 1 else 0.0 + ) + # Response duration statistics for this category if category_response_durations[category]: durations = category_response_durations[category] @@ -215,14 +233,19 @@ def calculate_scores(data, grade_path, output_path): "p95": np.percentile(durations, 95), "std": np.std(durations), "min": np.min(durations), - "max": np.max(durations) + "max": np.max(durations), } else: category_scores[category]["response_duration"] = { - "mean": 0.0, "median": 0.0, "p50": 0.0, "p95": 0.0, - "std": 0.0, "min": 0.0, "max": 0.0 + "mean": 0.0, + "median": 0.0, + "p50": 0.0, + "p95": 0.0, + "std": 0.0, + "min": 0.0, + "max": 0.0, } - + # Search duration statistics for this category if category_search_durations[category]: durations = category_search_durations[category] @@ -233,14 +256,19 @@ def calculate_scores(data, grade_path, output_path): "p95": np.percentile(durations, 95), "std": np.std(durations), "min": np.min(durations), - "max": np.max(durations) + "max": np.max(durations), } else: category_scores[category]["search_duration"] = { - "mean": 0.0, "median": 0.0, "p50": 0.0, "p95": 0.0, - "std": 0.0, "min": 0.0, "max": 0.0 + "mean": 0.0, + "median": 0.0, + "p50": 0.0, + "p95": 0.0, + "std": 0.0, + "min": 0.0, + "max": 0.0, } - + # Build final results results = { "metrics": { @@ -249,22 +277,22 @@ def calculate_scores(data, grade_path, output_path): "total_questions": total_questions, "total_runs": total_questions * num_runs if num_runs else 0, "response_duration": response_duration_stats, - "search_duration": search_duration_stats + "search_duration": search_duration_stats, }, "category_scores": category_scores, - "user_scores": user_metrics + "user_scores": user_metrics, } - + # Save results to JSON file with open(grade_path, "w") as outfile: json.dump(results, outfile, indent=4, ensure_ascii=False) - + # Save to Excel save_to_excel(results, output_path) - + # Print summary print_summary(results) - + return results @@ -273,19 +301,19 @@ def print_summary(results): print("\n" + "=" * 80) print("πŸ“Š PERSONAMEM EVALUATION SUMMARY".center(80)) print("=" * 80) - + # Overall accuracy accuracy = results["metrics"]["accuracy"] accuracy_std = results["metrics"]["accuracy_std"] total_questions = results["metrics"]["total_questions"] total_runs = results["metrics"]["total_runs"] - + print(f"🎯 Overall Accuracy: {accuracy:.4f} Β± {accuracy_std:.4f}") print(f"πŸ“‹ Total Questions: {total_questions}") print(f"πŸ”„ Total Runs: {total_runs}") - + print("-" * 80) - + # Response duration statistics if results["metrics"]["response_duration"]: rd = results["metrics"]["response_duration"] @@ -294,7 +322,7 @@ def print_summary(results): print(f" P50: \033[96m{rd['p50']:.2f}") print(f" P95: \033[91m{rd['p95']:.2f}") print(f" Std Dev: {rd['std']:.2f}") - + # Search duration statistics if results["metrics"]["search_duration"]: sd = results["metrics"]["search_duration"] @@ -303,9 +331,9 @@ def print_summary(results): print(f" P50: \033[96m{sd['p50']:.2f}") print(f" P95: \033[91m{sd['p95']:.2f}") print(f" Std Dev: {sd['std']:.2f}") - + print("-" * 80) - + # Category-wise accuracy print("πŸ“‚ Category-wise Accuracy:") for category, scores in results["category_scores"].items(): @@ -313,50 +341,47 @@ def print_summary(results): acc_std = scores["accuracy_std"] total_cat = scores["total_questions"] total_runs_cat = scores["total_runs"] - print(f" {category:<35}: {acc:.4f} Β± {acc_std:.4f} ({total_cat} questions, {total_runs_cat} runs)") - + print( + f" {category:<35}: {acc:.4f} Β± {acc_std:.4f} ({total_cat} questions, {total_runs_cat} runs)" + ) + print("=" * 80 + "\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem evaluation metrics calculation script") parser.add_argument( - "--lib", - type=str, - choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], + "--lib", + type=str, + choices=["zep", "mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory"], required=True, help="Memory library to evaluate", - default='memos-api' - ) - parser.add_argument( - "--version", - type=str, - default="0925", - help="Evaluation framework version" + default="memos-api", ) - + parser.add_argument("--version", type=str, default="0925", help="Evaluation framework version") + args = parser.parse_args() lib, version = args.lib, args.version - + # Define file paths responses_path = f"results/pm/{lib}-{version}/{lib}_pm_responses.json" grade_path = f"results/pm/{lib}-{version}/{lib}_pm_grades.json" output_path = f"results/pm/{lib}-{version}/{lib}_pm_results.xlsx" - + print(f"πŸ“‚ Loading response data from: {responses_path}") - + try: - with open(responses_path, 'r', encoding='utf-8') as file: + with open(responses_path, "r", encoding="utf-8") as file: data = json.load(file) - + # Calculate metrics results = calculate_scores(data, grade_path, output_path) - + print(f"πŸ“ Results saved to: {grade_path}") print(f"πŸ“Š Excel report saved to: {output_path}") - + except FileNotFoundError: print(f"❌ Error: File not found {responses_path}") print("Please make sure to run pm_responses.py first to generate response data") except Exception as e: - print(f"❌ Error occurred during processing: {e}") \ No newline at end of file + print(f"❌ Error occurred during processing: {e}") diff --git a/evaluation/scripts/personamem/pm_responses.py b/evaluation/scripts/personamem/pm_responses.py index c48933c11..8bfeaf5f6 100644 --- a/evaluation/scripts/personamem/pm_responses.py +++ b/evaluation/scripts/personamem/pm_responses.py @@ -19,11 +19,11 @@ def extract_choice_answer(predicted_answer, correct_answer): def _extract_only_options(text): text = text.lower() - in_parens = re.findall(r'\(([a-d])\)', text) + in_parens = re.findall(r"\(([a-d])\)", text) if in_parens: return set(in_parens) else: - return set(re.findall(r'\b([a-d])\b', text)) + return set(re.findall(r"\b([a-d])\b", text)) correct = correct_answer.lower().strip("() ") @@ -33,7 +33,7 @@ def _extract_only_options(text): if "" in predicted_answer: predicted_answer = predicted_answer.split("")[-1].strip() if predicted_answer.endswith(""): - predicted_answer = predicted_answer[:-len("")].strip() + predicted_answer = predicted_answer[: -len("")].strip() pred_options = _extract_only_options(predicted_answer) @@ -79,12 +79,14 @@ def process_qa(user_id, search_result, num_runs, llm_client): is_correct, answer = extract_choice_answer(answer, search_result.get("golden_answer", "")) response_duration_ms = (time() - start) * 1000 - run_results.append({ - "run_id": idx + 1, - "answer": answer, - "is_correct": is_correct, - "response_duration_ms": response_duration_ms, - }) + run_results.append( + { + "run_id": idx + 1, + "answer": answer, + "is_correct": is_correct, + "response_duration_ms": response_duration_ms, + } + ) response_duration_ms = sum(result["response_duration_ms"] for result in run_results) / num_runs @@ -95,8 +97,11 @@ def process_qa(user_id, search_result, num_runs, llm_client): print(f"πŸ’‘ Golden Answer: {search_result.get('golden_answer', 'N/A')}") for idx, result in enumerate(run_results, start=1): print(f"\nπŸ”„ Run {idx}/{num_runs}:") - print(f"πŸ’¬ Run Answer: {result['answer'][:150]}..." if len( - result['answer']) > 150 else f"πŸ’¬ Run Answer: {result['answer']}") + print( + f"πŸ’¬ Run Answer: {result['answer'][:150]}..." + if len(result["answer"]) > 150 + else f"πŸ’¬ Run Answer: {result['answer']}" + ) print(f"βœ… Run Is Correct: {result['is_correct']}") print(f"⏱️ Run Duration: {result['response_duration_ms']:.2f} ms") print("-" * 80) @@ -122,7 +127,9 @@ def main(frame, version, num_runs=3, num_workers=4): load_dotenv() - oai_client = OpenAI(api_key=os.getenv("CHAT_MODEL_API_KEY"), base_url=os.getenv("CHAT_MODEL_BASE_URL")) + oai_client = OpenAI( + api_key=os.getenv("CHAT_MODEL_API_KEY"), base_url=os.getenv("CHAT_MODEL_BASE_URL") + ) print(f"πŸ”Œ Using OpenAI client with model: {os.getenv('CHAT_MODEL')}") search_path = f"results/pm/{frame}-{version}/{frame}_pm_search_results.json" @@ -146,9 +153,9 @@ def main(frame, version, num_runs=3, num_workers=4): future_to_user_id[future] = user_id for future in tqdm( - as_completed(future_to_user_id), - total=len(future_to_user_id), - desc="πŸ“ Generating responses", + as_completed(future_to_user_id), + total=len(future_to_user_id), + desc="πŸ“ Generating responses", ): user_id = future_to_user_id[future] try: @@ -177,10 +184,21 @@ def main(frame, version, num_runs=3, num_workers=4): if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Response Generation Script") - parser.add_argument("--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], default='memos-api') - parser.add_argument("--version", type=str, default="0925", help="Version of the evaluation framework.") - parser.add_argument("--num_runs", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation.") - parser.add_argument("--workers", type=int, default=3, help="Number of worker threads to use for processing.") + parser.add_argument( + "--lib", + type=str, + choices=["zep", "mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory"], + default="memos-api", + ) + parser.add_argument( + "--version", type=str, default="0925", help="Version of the evaluation framework." + ) + parser.add_argument( + "--num_runs", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." + ) + parser.add_argument( + "--workers", type=int, default=3, help="Number of worker threads to use for processing." + ) args = parser.parse_args() main(frame=args.lib, version=args.version, num_runs=args.num_runs, num_workers=args.workers) diff --git a/evaluation/scripts/personamem/pm_search.py b/evaluation/scripts/personamem/pm_search.py index 50f46f692..2e1a268fc 100644 --- a/evaluation/scripts/personamem/pm_search.py +++ b/evaluation/scripts/personamem/pm_search.py @@ -2,17 +2,15 @@ import json import os import sys - -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from time import time - +from tqdm import tqdm import csv -from tqdm import tqdm -from utils.client import mem0_client,zep_client,memos_api_client +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from utils.prompts import ( MEM0_CONTEXT_TEMPLATE, MEM0_GRAPH_CONTEXT_TEMPLATE, @@ -50,93 +48,68 @@ def zep_search(client, user_id, query, top_k=20): return context, duration_ms -def mem0_search(client, user_id, query, top_k=20, enable_graph=False, frame="mem0-api"): +def mem0_search(client, query, user_id, top_k): start = time() - - if frame == "mem0-local": - results = client.search( - query=query, - user_id=user_id, - top_k=top_k, - ) - search_memories = "\n".join( + results = client.search(query, user_id, top_k) + memory = [f"{memory['created_at']}: {memory['memory']}" for memory in results["results"]] + if client.enable_graph: + graph = "\n".join( [ - f" - {item['memory']} (date: {item['metadata']['timestamp']})" - for item in results["results"] + f" - 'source': {item.get('source', '?')} -> 'target': {item.get('target', '?')} " + f"(relationship: {item.get('relationship', '?')})" + for item in results.get("relations", []) ] ) - search_graph = ( - "\n".join( - [ - f" - 'source': {item.get('source', '?')} -> 'target': {item.get('destination', '?')} (relationship: {item.get('relationship', '?')})" - for item in results.get("relations", []) - ] - ) - if enable_graph - else "" - ) - - elif frame == "mem0-api": - results = client.search( - query=query, - user_id=user_id, - top_k=top_k, - version="v2", - output_format="v1.1", - enable_graph=enable_graph, - filters={"AND": [{"user_id": user_id}, {"run_id": "*"}]}, - ) - search_memories = "\n".join( - [f" - {item['memory']} (date: {item['created_at']})" for item in results["results"]] - ) - search_graph = ( - "\n".join( - [ - f" - 'source': {item.get('source', '?')} -> 'target': {item.get('target', '?')} (relationship: {item.get('relationship', '?')})" - for item in results.get("relations", []) - ] - ) - if enable_graph - else "" - ) - if enable_graph: context = MEM0_GRAPH_CONTEXT_TEMPLATE.format( - user_id=user_id, memories=search_memories, relations=search_graph + user_id=user_id, memories=memory, relations=graph ) else: - context = MEM0_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) + context = MEM0_CONTEXT_TEMPLATE.format(user_id=user_id, memories=memory) duration_ms = (time() - start) * 1000 return context, duration_ms -def memos_search(client, user_id, query, top_k, frame="memos-local"): +def memobase_search(client, query, user_id, top_k): start = time() - if frame == "memos-local": - results = client.search( - query=query, - user_id=user_id, - ) + context = client.search(query=query, user_id=user_id, top_k=top_k) + duration_ms = (time() - start) * 1000 + return context, duration_ms - results = filter_memory_data(results)["text_mem"][0]["memories"] - search_memories = "\n".join([f" - {item['memory']}" for item in results]) - elif frame == "memos-api": - results = client.search(query=query, user_id=user_id, top_k=top_k) - search_memories = "\n".join(f"- {entry.get('memory_value', '')}" - for entry in results.get("memory_detail_list", [])) +def memos_search(client, user_id, query, top_k): + start = time() + results = client.search(query=query, user_id=user_id, top_k=top_k) + search_memories = "\n".join( + item["memory"] for cube in results["text_mem"] for item in cube["memories"] + ) context = MEMOS_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) duration_ms = (time() - start) * 1000 return context, duration_ms +def supermemory_search(client, query, user_id, top_k): + start = time() + context = client.search(query, user_id, top_k) + duration_ms = (time() - start) * 1000 + return context, duration_ms + + +def memu_search(client, query, user_id, top_k): + start = time() + results = client.search(query, user_id, top_k) + context = "\n".join(results) + duration_ms = (time() - start) * 1000 + return context, duration_ms + + def build_jsonl_index(jsonl_path): """ Scan the JSONL file once to build a mapping: {key: file_offset}. Assumes each line is a JSON object with a single key-value pair. """ index = {} - with open(jsonl_path, 'r', encoding='utf-8') as f: + with open(jsonl_path, "r", encoding="utf-8") as f: while True: offset = f.tell() line = f.readline() @@ -148,14 +121,14 @@ def build_jsonl_index(jsonl_path): def load_context_by_id(jsonl_path, offset): - with open(jsonl_path, 'r', encoding='utf-8') as f: + with open(jsonl_path, "r", encoding="utf-8") as f: f.seek(offset) item = json.loads(f.readline()) return next(iter(item.values())) def load_rows(csv_path): - with open(csv_path, mode='r', newline='', encoding='utf-8') as csvfile: + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: reader = csv.DictReader(csvfile) for _, row in enumerate(reader, start=1): row_data = {} @@ -167,7 +140,7 @@ def load_rows(csv_path): def load_rows_with_context(csv_path, jsonl_path): jsonl_index = build_jsonl_index(jsonl_path) - with open(csv_path, mode='r', newline='', encoding='utf-8') as csvfile: + with open(csv_path, mode="r", newline="", encoding="utf-8") as csvfile: reader = csv.DictReader(csvfile) prev_sid = None prev_context = None @@ -190,7 +163,7 @@ def load_rows_with_context(csv_path, jsonl_path): def count_csv_rows(csv_path): - with open(csv_path, mode='r', newline='', encoding='utf-8') as f: + with open(csv_path, mode="r", newline="", encoding="utf-8") as f: return sum(1 for _ in f) - 1 @@ -219,35 +192,36 @@ def process_user(row_data, conv_idx, frame, version, top_k=20): return existing_results if frame == "zep": - client = zep_client() + from utils.client import ZepClient + + client = ZepClient() print("πŸ”Œ Using Zep client for search...") context, duration_ms = zep_search(client, user_id, question) - elif frame == "mem0-local": - client = mem0_client(mode="local") - print("πŸ”Œ Using Mem0 Local client for search...") - context, duration_ms = mem0_search(client, user_id, question, top_k=top_k, frame=frame) - elif frame == "mem0-api": - client = mem0_client(mode="api") + elif frame == "mem0" or frame == "mem0-graph": + from utils.client import Mem0Client + + client = Mem0Client(enable_graph="graph" in frame) print("πŸ”Œ Using Mem0 API client for search...") - context, duration_ms = mem0_search(client, user_id, question, top_k=top_k, frame=frame) - elif frame == "memos-local": - client = memos_client( - mode="local", - db_name=f"pm_{frame}-{version}", - user_id=user_id, - top_k=top_k, - mem_cube_path=f"results/pm/{frame}-{version}/storages/{user_id}", - mem_cube_config_path="configs/mu_mem_cube_config.json", - mem_os_config_path="configs/mos_memos_config.json", - addorsearch="search", - ) - print("πŸ”Œ Using Memos Local client for search...") - context, duration_ms = memos_search(client, user_id, question, frame=frame) + context, duration_ms = mem0_search(client, question, user_id, top_k) elif frame == "memos-api": - client = memos_api_client() + from utils.client import MemosApiClient + + client = MemosApiClient() print("πŸ”Œ Using Memos API client for search...") - context, duration_ms = memos_search(client, user_id, question, top_k=top_k, frame=frame) + context, duration_ms = memos_search(client, user_id, question, top_k=top_k) + elif frame == "supermemory": + from utils.client import SupermemoryClient + + client = SupermemoryClient() + print("πŸ”Œ Using supermemory client for search...") + context, duration_ms = supermemory_search(client, question, user_id, top_k) + elif frame == "memu": + from utils.client import MemuClient + + client = MemuClient() + print("πŸ”Œ Using memu client for search...") + context, duration_ms = memu_search(client, question, user_id, top_k) search_results[user_id].append( { @@ -266,25 +240,23 @@ def process_user(row_data, conv_idx, frame, version, top_k=20): os.makedirs(f"results/pm/{frame}-{version}/tmp", exist_ok=True) with open( - f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{conv_idx}.json", "w" + f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{conv_idx}.json", "w" ) as f: json.dump(search_results, f, indent=4) - print(f"πŸ’Ύ \033[92mSearch results for conversation {conv_idx} saved...") + print(f"πŸ’Ύ Search results for conversation {conv_idx} saved...") print("-" * 80) return search_results def load_existing_results(frame, version, group_idx): - result_path = ( - f"results/locomo/{frame}-{version}/tmp/{frame}_locomo_search_results_{group_idx}.json" - ) + result_path = f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{group_idx}.json" if os.path.exists(result_path): try: with open(result_path) as f: return json.load(f), True except Exception as e: - print(f"\033[91m❌ Error loading existing results for group {group_idx}: {e}") + print(f"❌ Error loading existing results for group {group_idx}: {e}") return {}, False @@ -299,9 +271,7 @@ def main(frame, version, top_k=20, num_workers=2): print(f"πŸ“š Loaded PersonaMem dataset from {question_csv_path} and {context_jsonl_path}") print(f"πŸ“Š Total conversations: {total_rows}") - print( - f"βš™οΈ Search parameters: top_k={top_k}, workers={num_workers}" - ) + print(f"βš™οΈ Search parameters: top_k={top_k}, workers={num_workers}") print("-" * 80) all_search_results = defaultdict(list) @@ -320,7 +290,9 @@ def main(frame, version, top_k=20, num_workers=2): for idx, (row_data, _) in enumerate(all_data) } - for future in tqdm(as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations"): + for future in tqdm( + as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + ): idx = future_to_idx[future] try: search_results = future.result() @@ -328,37 +300,41 @@ def main(frame, version, top_k=20, num_workers=2): all_search_results[user_id].extend(results) print(f"βœ… Conversation {idx} processed successfully.") except Exception as exc: - print(f'\n❌ Conversation {idx} generated an exception: {exc}') + print(f"\n❌ Conversation {idx} generated an exception: {exc}") end_time = datetime.now() elapsed_time = end_time - start_time elapsed_time_str = str(elapsed_time).split(".")[0] print("\n" + "=" * 80) - print("βœ… \033[1;32mSEARCH COMPLETE".center(80)) + print("βœ… SEARCH COMPLETE".center(80)) print("=" * 80) - print( - f"⏱️ Total time taken to search {total_rows} users: \033[92m{elapsed_time_str}" - ) - print( - f"πŸ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}" - ) + print(f"⏱️ Total time taken to search {total_rows} users: {elapsed_time_str}") + print(f"πŸ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}") with open(f"results/pm/{frame}-{version}/{frame}_pm_search_results.json", "w") as f: json.dump(dict(all_search_results), f, indent=4) - print( - f"πŸ“ Results saved to: \033[1;94mresults/pm/{frame}-{version}/{frame}_pm_search_results.json" - ) + print(f"πŸ“ Results saved to: mresults/pm/{frame}-{version}/{frame}_pm_search_results.json") print("=" * 80 + "\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Search Script") - parser.add_argument("--lib", type=str, choices=["mem0-local", "mem0-api", "memos-local", "memos-api", "zep"], - default='memos-api') - parser.add_argument("--version", type=str, default="0925", help="Version of the evaluation framework.") - parser.add_argument("--top_k", type=int, default=20, help="Number of top results to retrieve from the search.") - parser.add_argument("--workers", type=int, default=3, help="Number of parallel workers for processing users.") + parser.add_argument( + "--lib", + type=str, + choices=["mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory"], + default="memos-api", + ) + parser.add_argument( + "--version", type=str, default="default", help="Version of the evaluation framework." + ) + parser.add_argument( + "--top_k", type=int, default=20, help="Number of top results to retrieve from the search." + ) + parser.add_argument( + "--workers", type=int, default=3, help="Number of parallel workers for processing users." + ) args = parser.parse_args() diff --git a/evaluation/scripts/run_pm_eval.sh b/evaluation/scripts/run_pm_eval.sh index 89484616b..f83893fed 100755 --- a/evaluation/scripts/run_pm_eval.sh +++ b/evaluation/scripts/run_pm_eval.sh @@ -1,41 +1,65 @@ #!/bin/bash # Common parameters for all scripts -LIB="memos-api" -VERSION="072201" +LIB="memu" +VERSION="072202" WORKERS=10 TOPK=20 -# echo "downloading data..." -# export HF_ENDPOINT=https://hf-mirror.com -# huggingface-cli download --repo-type dataset bowen-upenn/PersonaMem --local-dir /mnt/afs/codes/ljl/MemOS/evaluation/data/personamem +if [ "$LIB" = "mirix" ]; then + echo "Running pm_mirix.py 100 times..." + for i in {1..100}; do + echo "Iteration $i/100" + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_mirix.py --version $VERSION --workers 1 + if [ $? -ne 0 ]; then + echo "Error running xx.py on iteration $i" + exit 1 + fi + done +elif ["$LIB" = "zep"]; then + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_ingestion_zep.py --version $VERSION --workers $WORKERS + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_search_zep.py --version $VERSION --top_k $TOPK --workers $WORKERS + echo "Running pm_responses.py..." + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_responses.py --lib $LIB --version $VERSION --workers $WORKERS + if [ $? -ne 0 ]; then + echo "Error running pm_responses.py" + exit 1 + fi -echo "Running pm_ingestion.py..." -CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_ingestion.py --lib $LIB --version $VERSION --workers $WORKERS -if [ $? -ne 0 ]; then - echo "Error running pm_ingestion.py" - exit 1 -fi + echo "Running pm_metric.py..." + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_metric.py --lib $LIB --version $VERSION + if [ $? -ne 0 ]; then + echo "Error running pm_metric.py" + exit 1 + fi +else + echo "Running pm_ingestion.py..." + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_ingestion.py --lib $LIB --version $VERSION --workers $WORKERS + if [ $? -ne 0 ]; then + echo "Error running pm_ingestion.py" + exit 1 + fi -echo "Running pm_search.py..." -CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_search.py --lib $LIB --version $VERSION --top_k $TOPK --workers $WORKERS -if [ $? -ne 0 ]; then - echo "Error running pm_search.py" - exit 1 -fi + echo "Running pm_search.py..." + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_search.py --lib $LIB --version $VERSION --top_k $TOPK --workers $WORKERS + if [ $? -ne 0 ]; then + echo "Error running pm_search.py" + exit 1 + fi -echo "Running pm_responses.py..." -CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_responses.py --lib $LIB --version $VERSION --workers $WORKERS -if [ $? -ne 0 ]; then - echo "Error running pm_responses.py" - exit 1 -fi + echo "Running pm_responses.py..." + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_responses.py --lib $LIB --version $VERSION --workers $WORKERS + if [ $? -ne 0 ]; then + echo "Error running pm_responses.py" + exit 1 + fi -echo "Running pm_metric.py..." -CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_metric.py --lib $LIB --version $VERSION -if [ $? -ne 0 ]; then - echo "Error running pm_metric.py" - exit 1 + echo "Running pm_metric.py..." + CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_metric.py --lib $LIB --version $VERSION + if [ $? -ne 0 ]; then + echo "Error running pm_metric.py" + exit 1 + fi fi -echo "All scripts completed successfully!" +echo "All scripts completed successfully!" \ No newline at end of file diff --git a/evaluation/scripts/run_prefeval_eval.sh b/evaluation/scripts/run_prefeval_eval.sh index 8e718192a..001f8299d 100644 --- a/evaluation/scripts/run_prefeval_eval.sh +++ b/evaluation/scripts/run_prefeval_eval.sh @@ -9,21 +9,43 @@ WORKERS=10 # Parameters for pref_memos.py -TOP_K=10 +TOP_K=6 ADD_TURN=0 # Options: 0, 10, or 300 -LIB="memos-api" -VERSION="1021-5" +LIB="memos-api" +VERSION="1022-0" # --- File Paths --- # You may need to adjust these paths based on your project structure. -# Assumes Step 1 (preprocess) outputs this file: +# Step 1 (preprocess) outputs this file: PREPROCESSED_FILE="data/prefeval/pref_processed.jsonl" -# Intermediate file (output of 'add' mode, input for 'process' mode) -IDS_FILE="results/prefeval/pref_memos_add.jsonl" +# Create a directory name based on the *specific* LIB (e.g., "memos") +OUTPUT_DIR="results/prefeval/${LIB}_${VERSION}" + + +if [[ "$LIB" == *"mem0"* ]]; then + SCRIPT_NAME_BASE="mem0" +elif [[ "$LIB" == *"memos"* ]]; then + SCRIPT_NAME_BASE="memos" +elif [[ "$LIB" == *"memobase"* ]]; then + SCRIPT_NAME_BASE="memobase" +elif [[ "$LIB" == *"supermemory"* ]]; then + SCRIPT_NAME_BASE="supermemory" +elif [[ "$LIB" == *"memu"* ]]; then + SCRIPT_NAME_BASE="memu" +elif [[ "$LIB" == *"zep"* ]]; then + SCRIPT_NAME_BASE="zep" +else + SCRIPT_NAME_BASE=$LIB +fi + +# The script to be executed (e.g., pref_mem0.py) +LIB_SCRIPT="scripts/PrefEval/pref_${SCRIPT_NAME_BASE}.py" -# Final response file (output of 'process' mode, input for Step 3) -RESPONSE_FILE="results/prefeval/pref_memos_process.jsonl" +# Output files will be unique to the $LIB (e.g., pref_memos-api_add.jsonl) +IDS_FILE="${OUTPUT_DIR}/pref_${LIB}_add.jsonl" +SEARCH_FILE="${OUTPUT_DIR}/pref_${LIB}_search.jsonl" +RESPONSE_FILE="${OUTPUT_DIR}/pref_${LIB}_response.jsonl" # Set the Hugging Face mirror endpoint @@ -31,6 +53,8 @@ export HF_ENDPOINT="https://hf-mirror.com" echo "--- Starting PrefEval Pipeline ---" echo "Configuration: WORKERS=$WORKERS, TOP_K=$TOP_K, ADD_TURN=$ADD_TURN, LIB=$LIB, VERSION=$VERSION, HF_ENDPOINT=$HF_ENDPOINT" +echo "Results will be saved to: $OUTPUT_DIR" +echo "Using script: $LIB_SCRIPT (mapped from LIB=$LIB)" echo "" # --- Step 1: Preprocess the data --- @@ -42,11 +66,29 @@ if [ $? -ne 0 ]; then exit 1 fi -# --- Step 2: Generate responses using MemOS (split into 'add' and 'process') --- +# --- Create output directory --- +echo "" +echo "Creating output directory: $OUTPUT_DIR" +mkdir -p $OUTPUT_DIR +if [ $? -ne 0 ]; then + echo "Error: Could not create output directory '$OUTPUT_DIR'." + exit 1 +fi + +# Check if the *mapped* script exists +if [ ! -f "$LIB_SCRIPT" ]; then + echo "Error: Script not found for library '$LIB' (mapped to $LIB_SCRIPT)" + exit 1 +fi + +# --- Step 2: Generate responses based on LIB --- +echo "" +echo "--- Step 2: Generate responses using $LIB (3-Step Process) ---" + echo "" -echo "Running pref_memos.py in 'add' mode..." +echo "Running $LIB_SCRIPT in 'add' mode..." # Step 2a: Ingest conversations into memory and generate user_ids -python scripts/PrefEval/pref_memos.py add \ +python $LIB_SCRIPT add \ --input $PREPROCESSED_FILE \ --output $IDS_FILE \ --add-turn $ADD_TURN \ @@ -55,35 +97,49 @@ python scripts/PrefEval/pref_memos.py add \ --version $VERSION if [ $? -ne 0 ]; then - echo "Error: pref_memos.py 'add' mode failed." + echo "Error: $LIB_SCRIPT 'add' mode failed." exit 1 fi echo "" -echo "Running pref_memos.py in 'process' mode..." -# Step 2b: Search memories using user_ids and generate responses -python scripts/PrefEval/pref_memos.py process \ +echo "Running $LIB_SCRIPT in 'search' mode..." +# Step 2b: Search memories using user_ids +python $LIB_SCRIPT search \ --input $IDS_FILE \ - --output $RESPONSE_FILE \ + --output $SEARCH_FILE \ --top-k $TOP_K \ - --max-workers $WORKERS \ - --lib $LIB \ - --version $VERSION + --max-workers $WORKERS + +if [ $? -ne 0 ]; then + echo "Error: $LIB_SCRIPT 'search' mode failed." + exit 1 +fi + +echo "" +echo "Running $LIB_SCRIPT in 'response' mode..." +# Step 2c: Generate responses based on searched memories +python $LIB_SCRIPT response \ + --input $SEARCH_FILE \ + --output $RESPONSE_FILE \ + --max-workers $WORKERS if [ $? -ne 0 ]; then - echo "Error: pref_memos.py 'process' mode failed." + echo "Error: $LIB_SCRIPT 'response' mode failed." exit 1 fi # --- Step 3: Evaluate the generated responses --- echo "" echo "Running pref_eval.py..." -# Pass the WORKERS variable to the script's --concurrency-limit argument -python scripts/PrefEval/pref_eval.py --concurrency-limit $WORKERS +python scripts/PrefEval/pref_eval.py \ + --input $RESPONSE_FILE \ + --concurrency-limit $WORKERS + if [ $? -ne 0 ]; then echo "Error: Evaluation script failed." exit 1 fi echo "" -echo "--- PrefEval Pipeline completed successfully! ---" \ No newline at end of file +echo "--- PrefEval Pipeline completed successfully! ---" +echo "Final results are in $RESPONSE_FILE" diff --git a/evaluation/scripts/utils/client.py b/evaluation/scripts/utils/client.py index 87b863e86..2efb0493d 100644 --- a/evaluation/scripts/utils/client.py +++ b/evaluation/scripts/utils/client.py @@ -2,6 +2,8 @@ import os import sys import time +import uuid +from contextlib import suppress from datetime import datetime from dotenv import load_dotenv import requests @@ -17,7 +19,7 @@ def __init__(self): api_key = os.getenv("ZEP_API_KEY") self.client = Zep(api_key=api_key) - def add(self, messages, user_id, conv_id, timestamp): + def add(self, messages, user_id, timestamp): iso_date = datetime.fromtimestamp(timestamp).isoformat() for msg in messages: self.client.graph.add( @@ -49,18 +51,31 @@ def __init__(self, enable_graph=False): self.client = MemoryClient(api_key=os.getenv("MEM0_API_KEY")) self.enable_graph = enable_graph - def add(self, messages, user_id, timestamp): - if self.enable_graph: - self.client.add( - messages=messages, - timestamp=timestamp, - user_id=user_id, - output_format="v1.1", - version="v2", - enable_graph=True, - ) - else: - self.client.add(messages=messages, timestamp=timestamp, user_id=user_id, version="v2") + def add(self, messages, user_id, timestamp, batch_size=2): + max_retries = 5 + for i in range(0, len(messages), batch_size): + batch_messages = messages[i : i + batch_size] + for attempt in range(max_retries): + try: + if self.enable_graph: + self.client.add( + messages=batch_messages, + timestamp=timestamp, + user_id=user_id, + enable_graph=True, + ) + else: + self.client.add( + messages=batch_messages, + timestamp=timestamp, + user_id=user_id, + ) + break + except Exception as e: + if attempt < max_retries - 1: + time.sleep(2**attempt) + else: + raise e def search(self, query, user_id, top_k): if self.enable_graph: @@ -68,19 +83,15 @@ def search(self, query, user_id, top_k): query=query, top_k=top_k, user_id=user_id, - output_format="v1.1", - version="v2", enable_graph=True, - filters={"AND": [{"user_id": f"{user_id}"}, {"run_id": "*"}]}, + filters={"AND": [{"user_id": f"{user_id}"}]}, ) else: res = self.client.search( query=query, top_k=top_k, user_id=user_id, - output_format="v1.1", - version="v2", - filters={"AND": [{"user_id": f"{user_id}"}, {"run_id": "*"}]}, + filters={"AND": [{"user_id": f"{user_id}"}]}, ) return res @@ -93,18 +104,29 @@ def __init__(self): project_url=os.getenv("MEMOBASE_PROJECT_URL"), api_key=os.getenv("MEMOBASE_API_KEY") ) - def add(self, messages, user_id): - from memobase import ChatBlob - + def add(self, messages, user_id, batch_size=2): """ - user_id: memobase user_id messages = [{"role": "assistant", "content": data, "created_at": iso_date}] """ - user = self.client.get_user(user_id, no_get=True) - user.insert(ChatBlob(messages=messages), sync=True) + from memobase import ChatBlob + + real_uid = self.string_to_uuid(user_id) + user = self.client.get_or_create_user(real_uid) + for i in range(0, len(messages), batch_size): + batch_messages = messages[i : i + batch_size] + max_retries = 5 + for attempt in range(max_retries): + try: + _ = user.insert(ChatBlob(messages=batch_messages), sync=True) + except Exception as e: + if attempt < max_retries - 1: + time.sleep(2**attempt) + else: + raise e def search(self, query, user_id, top_k): - user = self.client.get_user(user_id, no_get=True) + real_uid = self.string_to_uuid(user_id) + user = self.client.get_user(real_uid, no_get=True) memories = user.context( max_token_size=top_k * 100, chats=[{"role": "user", "content": query}], @@ -113,6 +135,16 @@ def search(self, query, user_id, top_k): ) return memories + def delete_user(self, user_id): + from memobase.error import ServerError + + real_uid = self.string_to_uuid(user_id) + with suppress(ServerError): + self.client.delete_user(real_uid) + + def string_to_uuid(self, s: str, salt="memobase_client"): + return str(uuid.uuid5(uuid.NAMESPACE_DNS, s + salt)) + class MemosApiClient: def __init__(self): @@ -120,6 +152,9 @@ def __init__(self): self.headers = {"Content-Type": "application/json", "Authorization": os.getenv("MEMOS_KEY")} def add(self, messages, user_id, conv_id): + """ + messages = [{"role": "assistant", "content": data, "chat_time": date_str}] + """ url = f"{self.memos_url}/product/add" payload = json.dumps( { @@ -155,6 +190,62 @@ def search(self, query, user_id, top_k): return json.loads(response.text)["data"] +class MemosApiOnlineClient: + def __init__(self): + self.memos_url = os.getenv("MEMOS_ONLINE_URL") + self.headers = {"Content-Type": "application/json", "Authorization": os.getenv("MEMOS_KEY")} + + def add(self, messages, user_id, conv_id=None): + url = f"{self.memos_url}/add/message" + payload = json.dumps( + { + "messages": messages, + "user_id": user_id, + "conversation_id": conv_id, + } + ) + + max_retries = 5 + for attempt in range(max_retries): + try: + response = requests.request("POST", url, data=payload, headers=self.headers) + assert response.status_code == 200, response.text + assert json.loads(response.text)["message"] == "ok", response.text + return response.text + except Exception as e: + if attempt < max_retries - 1: + time.sleep(2**attempt) + else: + raise e + + def search(self, query, user_id, top_k): + """Search memories.""" + url = f"{self.memos_url}/search/memory" + payload = json.dumps( + { + "query": query, + "user_id": user_id, + "memory_limit_number": top_k, + } + ) + + max_retries = 5 + for attempt in range(max_retries): + try: + response = requests.request("POST", url, data=payload, headers=self.headers) + assert response.status_code == 200, response.text + assert json.loads(response.text)["message"] == "ok", response.text + res = json.loads(response.text)["data"]["memory_detail_list"] + for i in res: + i.update({"memory": i.pop("memory_value")}) + return {"text_mem": [{"memories": res}]} + except Exception as e: + if attempt < max_retries - 1: + time.sleep(2**attempt) + else: + raise e + + class SupermemoryClient: def __init__(self): from supermemory import Supermemory @@ -172,7 +263,7 @@ def add(self, messages, user_id): break except Exception as e: if attempt < max_retries - 1: - time.sleep(2**attempt) # ζŒ‡ζ•°ι€€ιΏ + time.sleep(2**attempt) else: raise e @@ -192,7 +283,7 @@ def search(self, query, user_id, top_k): return context except Exception as e: if attempt < max_retries - 1: - time.sleep(2**attempt) # ζŒ‡ζ•°ι€€ιΏ + time.sleep(2**attempt) else: raise e @@ -245,3 +336,10 @@ def wait_for_completion(self, task_id): timestamp = 1682899200 query = "杭州θ₯ΏζΉ–ζœ‰δ»€δΉˆ" top_k = 5 + + # MEMOBASE + client = MemobaseClient() + for m in messages: + m["created_at"] = iso_date + client.add(messages, user_id) + memories = client.search(query, user_id, top_k) diff --git a/evaluation/scripts/utils/mirix_utils.py b/evaluation/scripts/utils/mirix_utils.py new file mode 100644 index 000000000..e1b5f3de6 --- /dev/null +++ b/evaluation/scripts/utils/mirix_utils.py @@ -0,0 +1,81 @@ +import os +import yaml +from tqdm import tqdm + + +def get_mirix_client(config_path, load_from=None): + if os.path.exists(os.path.expanduser(f"~/.mirix")): + os.system(f"rm -rf ~/.mirix/*") + + with open(config_path, "r") as f: + agent_config = yaml.safe_load(f) + + os.environ["OPENAI_API_KEY"] = agent_config["api_key"] + import mirix + from mirix import Mirix, EmbeddingConfig, LLMConfig + + embedding_default_config = EmbeddingConfig( + embedding_model=agent_config["embedding_model_name"], + embedding_endpoint_type="openai", + embedding_endpoint=agent_config["model_endpoint"], + embedding_dim=1536, + embedding_chunk_size=8191, + ) + + llm_default_config = LLMConfig( + model=agent_config["model_name"], + model_endpoint_type="openai", + model_endpoint=agent_config["model_endpoint"], + api_key=agent_config["api_key"], + model_wrapper=None, + context_window=128000, + ) + + def embedding_default_config_func(cls, model_name=None, provider=None): + return embedding_default_config + + def llm_default_config_func(cls, model_name=None, provider=None): + return llm_default_config + + mirix.EmbeddingConfig.default_config = embedding_default_config_func + mirix.LLMConfig.default_config = llm_default_config_func + + assistant = Mirix( + api_key=agent_config["api_key"], + config_path=config_path, + model=agent_config["model_name"], + load_from=load_from, + ) + return assistant + + +if __name__ == "__main__": + config_path = "configs-example/mirix_config.yaml" + out_dir = "results/mirix-test" + + assistant = get_mirix_client(config_path) + + chunks = [ + "I prefer coffee over tea", + "My work hours are 9 AM to 5 PM", + "Important meeting with client on Friday at 2 PM", + ] + + for _idx, chunk in tqdm(enumerate(chunks), total=len(chunks)): + response = assistant.add(chunk) + + assistant.save(out_dir) + + assistant = get_mirix_client(config_path, load_from=out_dir) + response = assistant.chat("What's my schedule like this week?") + + print(response) + assistant.create_user(user_name="user1") + assistant.create_user(user_name="user2") + user1 = assistant.get_user_by_name(user_name="user1") + user2 = assistant.get_user_by_name(user_name="user2") + assistant.add("i prefer tea over coffee", user_id=user1.id) + assistant.add("my favourite drink is coke", user_id=user2.id) + response1 = assistant.chat("What drink do I prefer?", user_id=user1.id) + response2 = assistant.chat("What drink do I prefer?", user_id=user2.id) + print(response1, response2)