From 88a5ab4ec321e2c8e9d8464aca7a1baf7a168497 Mon Sep 17 00:00:00 2001 From: Prasad Chalasani Date: Fri, 15 Nov 2024 14:58:57 -0500 Subject: [PATCH 1/2] example: update from main repo --- examples/basic/chat.py | 10 +- examples/basic/text-to-structured.py | 150 ++++++++++++++++++++++ examples/docqa/doc-aware-chat.py | 171 +++++++++++++++++++++++++ examples/kg-chat/chat-arangodb-igvf.py | 152 ++++++++++++++++++++++ examples/kg-chat/chat-arangodb.py | 9 +- 5 files changed, 480 insertions(+), 12 deletions(-) create mode 100644 examples/basic/text-to-structured.py create mode 100644 examples/docqa/doc-aware-chat.py create mode 100644 examples/kg-chat/chat-arangodb-igvf.py diff --git a/examples/basic/chat.py b/examples/basic/chat.py index 2cd8ae4..eff4cbb 100644 --- a/examples/basic/chat.py +++ b/examples/basic/chat.py @@ -41,7 +41,6 @@ def main( model: str = typer.Option("", "--model", "-m", help="model name"), no_stream: bool = typer.Option(False, "--nostream", "-ns", help="no streaming"), nocache: bool = typer.Option(False, "--nocache", "-nc", help="don't use cache"), - query: str = typer.Option("", "--query", "-q", help="initial user query or msg"), sys_msg: str = typer.Option( "You are a helpful assistant. Be concise in your answers.", "--sysmsg", @@ -83,14 +82,7 @@ def main( ) agent = ChatAgent(config) task = Task(agent) - # OpenAI models are ok with just a system msg, - # but in some scenarios, other (e.g. llama) models - # seem to do better when kicked off with a sys msg and a user msg. - # In those cases we may want to do task.run("hello") instead. - if query: - task.run(query) - else: - task.run() + task.run("hello") if __name__ == "__main__": diff --git a/examples/basic/text-to-structured.py b/examples/basic/text-to-structured.py new file mode 100644 index 0000000..56d64b3 --- /dev/null +++ b/examples/basic/text-to-structured.py @@ -0,0 +1,150 @@ +""" +Function-calling example using a local LLM, with ollama. + +"Function-calling" refers to the ability of the LLM to generate +a structured response, typically a JSON object, instead of a plain text response, +which is then interpreted by your code to perform some action. +This is also referred to in various scenarios as "Tools", "Actions" or "Plugins". +See more here: https://langroid.github.io/langroid/quick-start/chat-agent-tool/ + +Run like this (to run with llama-3.1-8b-instant via groq): + +python3 examples/basic/text-to-structured.py -m groq/llama-3.1-8b-instant + +Other models to try it with: +- ollama/qwen2.5-coder +- ollama/qwen2.5 + + +See here for how to set up a Local LLM to work with Langroid: +https://langroid.github.io/langroid/tutorials/local-llm-setup/ + + +""" + +import os +from typing import List, Literal +import fire +import json +from rich.prompt import Prompt + +from langroid.pydantic_v1 import BaseModel, Field +import langroid as lr +from langroid.utils.configuration import settings +from langroid.agent.tool_message import ToolMessage +from langroid.agent.tools.orchestration import ResultTool +import langroid.language_models as lm + +# for best results: +DEFAULT_LLM = lm.OpenAIChatModel.GPT4o + +os.environ["TOKENIZERS_PARALLELISM"] = "false" + +# (1) Define the desired structure via Pydantic. +# The "Field" annotations are optional, and are included in the system message +# if provided, and help with generation accuracy. + + +class Wifi(BaseModel): + name: str + + +class HomeSettings(BaseModel): + App: List[str] = Field(..., description="List of apps found in text") + wifi: List[Wifi] = Field(..., description="List of wifi networks found in text") + brightness: Literal["low", "medium", "high"] = Field( + ..., description="Brightness level found in text" + ) + + +# (2) Define the Tool class for the LLM to use, to produce the above structure. +class HomeAutomationTool(lr.agent.ToolMessage): + """Tool to extract Home Automation structure from text""" + + request: str = "home_automation_tool" + purpose: str = """ + To extract structure from a given text. + """ + home_settings: HomeSettings = Field( + ..., description="Home Automation settings from given text" + ) + + def handle(self) -> str: + """Handle LLM's structured output if it matches HomeAutomationTool structure""" + print( + f""" + SUCCESS! Got Valid Home Automation Settings: + {json.dumps(self.home_settings.dict(), indent=2)} + """ + ) + return ResultTool(settings=self.home_settings) + + @classmethod + def examples(cls) -> List["ToolMessage"]: + # Used to provide few-shot examples in the system prompt + return [ + ( + """ + I have extracted apps Spotify and Netflix, + wifi HomeWifi, and brightness medium + """, + cls( + home_settings=HomeSettings( + App=["Spotify", "Netflix"], + wifi=[Wifi(name="HomeWifi")], + brightness="medium", + ) + ), + ) + ] + + +def app( + m: str = DEFAULT_LLM, # model + d: bool = False, # pass -d to enable debug mode (see prompts etc) + nc: bool = False, # pass -nc to disable cache-retrieval (i.e. get fresh answers) +): + settings.debug = d + settings.cache = not nc + # create LLM config + llm_cfg = lm.OpenAIGPTConfig( + chat_model=m or DEFAULT_LLM, + chat_context_length=4096, # set this based on model + max_output_tokens=100, + temperature=0.2, + stream=True, + timeout=45, + ) + + tool_name = HomeAutomationTool.default_value("request") + config = lr.ChatAgentConfig( + llm=llm_cfg, + system_message=f""" + You are an expert in extracting home automation settings from text. + When user gives a piece of text, use the TOOL `{tool_name}` + to present the extracted structured information. + """, + ) + + agent = lr.ChatAgent(config) + + # (4) Enable the Tool for this agent --> this auto-inserts JSON instructions + # and few-shot examples (specified in the tool defn above) into the system message + agent.enable_message(HomeAutomationTool) + + # (5) Create task and run it to start an interactive loop + # Specialize the task to return a ResultTool object + task = lr.Task(agent, interactive=False)[ResultTool] + + # set up a loop to extract Home Automation settings from text + while True: + text = Prompt.ask("[blue]Enter text (or q/x to exit)") + if not text or text.lower() in ["x", "q"]: + break + result = task.run(text) + assert isinstance(result, ResultTool) + assert isinstance(result.settings, HomeSettings) + + +if __name__ == "__main__": + fire.Fire(app) diff --git a/examples/docqa/doc-aware-chat.py b/examples/docqa/doc-aware-chat.py new file mode 100644 index 0000000..4390931 --- /dev/null +++ b/examples/docqa/doc-aware-chat.py @@ -0,0 +1,171 @@ +""" +Single Agent for Doc-aware chat with user. + +- user asks question +- LLM decides whether to: + - ask user for follow-up/clarifying information, or + - retrieve relevant passages from documents, or + - provide a final answer, if it has enough information from user and documents. + +To reduce response latency, in the DocChatAgentConfig, +you can set the `relevance_extractor_config=None`, +to turn off the relevance_extraction step, which uses the LLM +to extract verbatim relevant portions of retrieved chunks. + +Run like this: + +python3 examples/docqa/doc-aware-chat.py +""" + +from typing import Optional, Any + +from rich import print +from rich.prompt import Prompt +import os + +from langroid import ChatDocument +from langroid.agent.special.doc_chat_agent import ( + DocChatAgent, + DocChatAgentConfig, +) +import langroid.language_models as lm +from langroid.mytypes import Entity +from langroid.parsing.parser import ParsingConfig, PdfParsingConfig, Splitter +from langroid.agent.chat_agent import ChatAgent +from langroid.agent.task import Task +from langroid.agent.tools.orchestration import ForwardTool +from langroid.agent.tools.retrieval_tool import RetrievalTool +from langroid.utils.configuration import set_global, Settings +from fire import Fire + +os.environ["TOKENIZERS_PARALLELISM"] = "false" + + +class DocAwareChatAgent(DocChatAgent): + def __init__(self, config: DocChatAgentConfig): + super().__init__(config) + self.enable_message(RetrievalTool) + + def retrieval_tool(self, msg: RetrievalTool) -> str: + results = super().retrieval_tool(msg) + return f""" + + RELEVANT PASSAGES: + ===== + {results} + ==== + + + BASED on these RELEVANT PASSAGES, DECIDE: + - If this is sufficient to provide the user a final answer specific to + their situation, do so. + - Otherwise, + - ASK the user for more information to get a better understanding + of their situation or context, OR + - use this tool again to get more relevant passages. + """ + + def llm_response( + self, + query: None | str | ChatDocument = None, + ) -> Optional[ChatDocument]: + # override DocChatAgent's default llm_response + return ChatAgent.llm_response(self, query) + + def handle_message_fallback(self, msg: str | ChatDocument) -> Any: + # we are here if there is no tool in the msg + if isinstance(msg, ChatDocument) and msg.metadata.sender == Entity.LLM: + # Any non-tool message must be meant for user, so forward it to user + return ForwardTool(agent="User") + + +def main( + debug: bool = False, + nocache: bool = False, + model: str = lm.OpenAIChatModel.GPT4o, +) -> None: + llm_config = lm.OpenAIGPTConfig(chat_model=model) + config = DocChatAgentConfig( + llm=llm_config, + n_query_rephrases=0, + hypothetical_answer=False, + relevance_extractor_config=None, + # this turns off standalone-query reformulation; set to False to enable it. + assistant_mode=True, + n_neighbor_chunks=2, + parsing=ParsingConfig( # modify as needed + splitter=Splitter.TOKENS, + chunk_size=100, # aim for this many tokens per chunk + n_neighbor_ids=5, + overlap=20, # overlap between chunks + max_chunks=10_000, + # aim to have at least this many chars per chunk when + # truncating due to punctuation + min_chunk_chars=200, + discard_chunk_chars=5, # discard chunks with fewer than this many chars + n_similar_docs=5, + # NOTE: PDF parsing is extremely challenging, each library has its own + # strengths and weaknesses. Try one that works for your use case. + pdf=PdfParsingConfig( + # alternatives: "unstructured", "pdfplumber", "fitz" + library="fitz", + ), + ), + ) + + set_global( + Settings( + debug=debug, + cache=not nocache, + ) + ) + + doc_agent = DocAwareChatAgent(config) + print("[blue]Welcome to the document chatbot!") + url = Prompt.ask("[blue]Enter the URL of a document") + doc_agent.ingest_doc_paths([url]) + + # For a more flexible/elaborate user doc-ingest dialog, use this: + # doc_agent.user_docs_ingest_dialog() + + doc_task = Task( + doc_agent, + interactive=False, + name="DocAgent", + system_message=f""" + You are a DOCUMENT-AWARE-GUIDE, but you do NOT have direct access to documents. + Instead you can use the `retrieval_tool` to get passages from the documents + that are relevant to a certain query or search phrase or topic. + DO NOT ATTEMPT TO ANSWER THE USER'S QUESTION WITHOUT RETRIEVING RELEVANT + PASSAGES FROM THE DOCUMENTS. DO NOT use your own existing knowledge!! + Everything you tell the user MUST be based on the documents. + + The user will ask you a question that you will NOT be able to answer + immediately, because you are MISSING some information about: + - the user or their context or situation, etc + - the documents relevant to the question + + At each turn you must decide among these possible ACTIONS: + - use the `{RetrievalTool.name()}` to get more relevant passages from the + documents, OR + - ANSWER the user if you think you have enough information + from the user AND the documents, to answer the question. + + You can use the `{RetrievalTool.name()}` multiple times to get more + relevant passages, if you think the previous ones were not sufficient. + + REMEMBER - your goal is to be VERY HELPFUL to the user; this means + you should NOT OVERWHELM them by throwing them a lot of information and + ask them to figure things out. Instead, you must GUIDE them + by asking SIMPLE QUESTIONS, ONE at at time, and finally provide them + a clear, DIRECTLY RELEVANT answer that is specific to their situation. + """, + ) + + print("[cyan]Enter x or q to quit, or ? for evidence") + + doc_task.run("Can you help me with some questions?") + + +if __name__ == "__main__": + Fire(main) diff --git a/examples/kg-chat/chat-arangodb-igvf.py b/examples/kg-chat/chat-arangodb-igvf.py new file mode 100644 index 0000000..8073805 --- /dev/null +++ b/examples/kg-chat/chat-arangodb-igvf.py @@ -0,0 +1,152 @@ +""" +Single-agent to use to chat with the IGVF ArangoDB knowledge-graph (KG) on cloud. + +Make sure to set the ARANGODB_PASSWORD in your environment variables. + +Run like this (--model is optional, defaults to GPT4o): + +python3 examples/kg-chat/chat-arangodb-igvf.py --model litellm/claude-3-5-sonnet-20241022 + +If using litellm, remember to install langroid with the litellm extra, e.g. +pip install "langroid[litellm]" + +See these guides for info on setting up langroid to use Open/Local LLMs +and other non-OpenAI LLMs: +- https://langroid.github.io/langroid/tutorials/local-llm-setup/ +- https://langroid.github.io/langroid/tutorials/non-openai-llms/ +""" + +import os +from typing import Optional +from dotenv import load_dotenv +from rich import print + +from fire import Fire + +import langroid.language_models as lm +from langroid import TaskConfig +from langroid.agent.special.arangodb.arangodb_agent import ( + ArangoChatAgentConfig, + ArangoChatAgent, + ArangoSettings, +) +from langroid.utils.constants import SEND_TO +from langroid.agent.chat_document import ChatDocument +from langroid.agent.task import Task +from langroid.utils.configuration import Settings, set_global +import logging + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + force=True, # Add this +) +root_logger = logging.getLogger() +root_logger.setLevel(logging.ERROR) +logger = logging.getLogger(__name__) + + +class MyArangoChatAgent(ArangoChatAgent): + def user_response( + self, + msg: Optional[str | ChatDocument] = None, + ) -> Optional[ChatDocument]: + response = super().user_response(msg) + if response is not None and response.content == "r": + + self.clear_history(1) # remove all msgs after system msg + n_msgs = len(self.message_history) + assert n_msgs == 1 + logger.warning("Reset Agent history, only system msg remains") + # prompt user again + return super().user_response(msg) + + return response + + +def main( + debug: bool = False, + model: str = "", + no_stream: bool = False, + nocache: bool = False, +) -> None: + set_global( + Settings( + debug=debug, + cache=nocache, + stream=not no_stream, + ) + ) + print( + """ + [blue]Welcome to ArangoDB Knowledge Graph RAG chatbot! + Enter x or q to quit at any point. + """ + ) + + load_dotenv() + + url = "https://db.catalog.igvf.org" + username = "guest" + db = "igvf" + pw = os.getenv("ARANGODB_PASSWORD") + arango_settings = ArangoSettings( + url=url, + username=username, + database=db, + password=pw, + ) + + arango_agent = MyArangoChatAgent( + ArangoChatAgentConfig( + name="Arango", + chat_mode=True, + arango_settings=arango_settings, + prepopulate_schema=True, + use_functions_api=False, + use_tools=True, + database_created=True, + llm=lm.OpenAIGPTConfig( + chat_model=model or lm.OpenAIChatModel.GPT4o, + chat_context_length=128_000, + ), + human_prompt=( + "Human (respond, or x/q to quit, r to reset history, " + "or hit enter to continue)" + ), + ) + ) + + task_config = TaskConfig(addressing_prefix=SEND_TO) + arango_task = Task( + arango_agent, + # user not awaited, UNLESS LLM explicitly addresses user via recipient_tool + interactive=False, + config=task_config, + ) + + arango_task.run( + "Can you help with some queries? " + "Be concise and ask me for clarifications when you're not sure what I mean." + ) + + # The above runs the app in a continuous chat. + # Alternatively, to set up a task to answer a single query and quit when done: + + # set up arango_agent above with chat_mode=False, set up arango_task as above, + # then run the task with a single query, e.g.: + + # result = arango_task.run("What is the location of the gene BRCA1?") + + # You can have this in a loop with the user, like so: + + # while True: + # query = Prompt.ask("Enter your query") + # if query in ["x", "q"]: + # break + # result = arango_task.run(query) + # print(result.content) + + +if __name__ == "__main__": + Fire(main) diff --git a/examples/kg-chat/chat-arangodb.py b/examples/kg-chat/chat-arangodb.py index 55a17f9..dc53082 100644 --- a/examples/kg-chat/chat-arangodb.py +++ b/examples/kg-chat/chat-arangodb.py @@ -47,7 +47,7 @@ force=True, # Add this ) root_logger = logging.getLogger() -root_logger.setLevel(logging.INFO) # Add this +root_logger.setLevel(logging.ERROR) logger = logging.getLogger(__name__) console = Console() @@ -60,7 +60,7 @@ def user_response( msg: Optional[str | ChatDocument] = None, ) -> Optional[ChatDocument]: response = super().user_response(msg) - if response.content == "r": + if response is not None and response.content == "r": self.clear_history(1) # remove all msgs after system msg n_msgs = len(self.message_history) @@ -173,7 +173,10 @@ def main( config=task_config, ) - arango_task.run() + arango_task.run( + "Can you help with some queries? " + "Be concise and ask me for clarifications when you're not sure what I mean." + ) # The above runs the app in a continuous chat. # Alternatively, to set up a task to answer a single query and quit when done: From f50f7ce23a1dc69ae55c5225e2afc2e78eeda15f Mon Sep 17 00:00:00 2001 From: Diego Margoni Date: Mon, 18 Nov 2024 00:04:22 +0100 Subject: [PATCH 2/2] add: chat and docchat agents with memories --- .../base_memory_chat_agent.py | 125 +++++++++++ .../memory-chat-agent/memory_chat_agent.py | 155 +++++++++++++ .../memory_doc_chat_agent.py | 207 ++++++++++++++++++ 3 files changed, 487 insertions(+) create mode 100644 examples/quick-start/memory-chat-agent/base_memory_chat_agent.py create mode 100644 examples/quick-start/memory-chat-agent/memory_chat_agent.py create mode 100644 examples/quick-start/memory-chat-agent/memory_doc_chat_agent.py diff --git a/examples/quick-start/memory-chat-agent/base_memory_chat_agent.py b/examples/quick-start/memory-chat-agent/base_memory_chat_agent.py new file mode 100644 index 0000000..8b93789 --- /dev/null +++ b/examples/quick-start/memory-chat-agent/base_memory_chat_agent.py @@ -0,0 +1,125 @@ +"""Base classes and implementations for chat agents with conversation memory.""" + +from typing import List, Optional, Tuple +from langroid.mytypes import Document +from abc import ABC, abstractmethod +from langroid.vector_store.base import VectorStore + + +class BaseMemoryChatAgent(ABC): + """Base class with shared memory functionality.""" + + vecdb: Optional[VectorStore] = None + + def __init__(self, username: Optional[str] = None) -> None: + """Initialize BaseMemoryChatAgent.""" + self.username = username + + @abstractmethod + def _store_document(self, document: Document) -> None: + """Store document in vector database.""" + + @abstractmethod + def _create_conversation_document(self, message: str, response: str) -> Document: + """Create conversation document from message and response.""" + + def store_conversation(self, message: str | None, response: str) -> None: + """Store conversation turn in vector database.""" + if self.vecdb is None or not message: + return + + # Check if similar content already exists + similar = self.vecdb.similar_texts_with_scores( + text=message, + k=1, + ) + if similar and similar[0][1] > 0.95: + return + + # Create conversation document (implementation specific to child classes) + conversation = self._create_conversation_document(message, response) + self._store_document(conversation) + + def get_relevant_context(self, query: Optional[str], k: int = 3) -> str: + """Context retrieval with filtering and timestamp sorting.""" + if self.vecdb is None: + return "" + + # If query is empty or None, get most recent messages + if not query or query.strip() == "": + return self._get_recent_context(k) + + # Get more results initially + results = self.vecdb.similar_texts_with_scores( + text=query, + k=k * 2, + ) + if not results: + return self.get_relevant_context(None, k) + + # Filter and sort results + filtered_results = self._filter_and_sort_results(results, k) + + if filtered_results: + context = "\n---\n".join([doc.content for doc in filtered_results]) + return f"\nRelevant conversation history:\n{context.strip()}" + + return self.get_relevant_context(None, k) + + def _get_recent_context(self, k: int) -> str: + """Get recent conversation context.""" + all_docs = self.vecdb.get_all_documents() + sorted_docs = sorted( + all_docs, + key=lambda x: x.metadata.timestamp if hasattr(x.metadata, "timestamp") else "", + reverse=True, + ) + recent_docs = sorted_docs[:k] + if recent_docs: + context = "\n---\n".join([doc.content for doc in recent_docs]) + return f"\nRecent conversation history:\n{context.strip()}" + return "" + + def _filter_and_sort_results(self, results: List[Tuple[Document, float]], k: int) -> List[Document]: + """Filter and sort search results.""" + filtered_results = [] + seen_content = set() + + for doc, score in results: + content = doc.content.strip() + if content not in seen_content and score > 0.7: + filtered_results.append(doc) + seen_content.add(content) + + filtered_results.sort( + key=lambda x: x.metadata.timestamp if hasattr(x.metadata, "timestamp") else "", reverse=True + ) + + return filtered_results[:k] + + def generate_system_prompt(self) -> str: + """Generate system prompt with relevant context.""" + prompt = f"""You are a helpful assistant with conversation memory. + You are talking to {self.username}.""" + + history = self.get_relevant_context("") + if history: + prompt += f"\n\nhere some of the past messages:\n{history}" + + return prompt + "\n\nPlease start the conversation." + + def _get_enriched_message(self, message_str: str) -> str: + """Add history context to user message.""" + context = self.get_relevant_context(message_str) if message_str else None + + if context: + enhanced_prompt = ( + f"HISTORY:\n{context}\n\n" + f"Current message: {message_str}\n\n" + "Respond to the current message, using the previous conversations " + "to personalize your response when relevant." + ) + else: + enhanced_prompt = message_str + + return enhanced_prompt diff --git a/examples/quick-start/memory-chat-agent/memory_chat_agent.py b/examples/quick-start/memory-chat-agent/memory_chat_agent.py new file mode 100644 index 0000000..c8a092f --- /dev/null +++ b/examples/quick-start/memory-chat-agent/memory_chat_agent.py @@ -0,0 +1,155 @@ +""" +Enhanced chat agent that remembers conversations using RAG. + +Basically it stores the conversation turns in a vector database and retrieves +relevant context to personalize responses for different users with different sessions. + +Run as follows +python3 examples/quick-start/memory-chat-agent/memory_chat_agent.py -u {username} + +To test try 2 different run with the same username and different messages: + +run 1 + USER: I am a software engineer + LLM: {some response} + <> + +run 2 + USER: What do I do for a living? + LLM: {some response} should be related to software engineering +""" + +import typer +from rich import print +from typing import Optional, List +from langroid.language_models.base import ( + LLMMessage, +) +from datetime import datetime +import langroid as lr +from langroid.utils.configuration import Settings +from langroid.vector_store.qdrantdb import QdrantDBConfig +from langroid.agent.chat_document import ChatDocument +from langroid.mytypes import DocMetaData +from base_memory_chat_agent import BaseMemoryChatAgent + +app = typer.Typer() + +lr.utils.logging.setup_colored_logging() + + +class MemoryChatAgent(lr.ChatAgent, BaseMemoryChatAgent): + """Chat agent with conversation memory.""" + + def __init__( + self, + config: lr.ChatAgentConfig, + task: Optional[List[LLMMessage]] = None, + username: Optional[str] = None, + ) -> None: + """Initialize MemoryChatAgent.""" + lr.ChatAgent.__init__(self, config, task) + BaseMemoryChatAgent.__init__(self, username) + + def _create_conversation_document(self, message: str, response: str) -> ChatDocument: + """Create conversation document from message and response.""" + return ChatDocument( + content=f"User ({self.username}): {message}\nAssistant: {response}", + metadata=DocMetaData( + sender=lr.Entity.USER, + sender_name=self.username, + timestamp=datetime.now().isoformat(), + conversation_type="dialogue", + ), + ) + + def _store_document(self, document: ChatDocument) -> None: + """Store document in vector database.""" + self.vecdb.add_documents([document]) + + def llm_response(self, message: Optional[str | ChatDocument] = None) -> Optional[ChatDocument]: + """Enhanced llm_response with memory retrieval.""" + if not self.llm_can_respond(message): + return None + + message_str = message.content if isinstance(message, ChatDocument) else message + enriched_message = self._get_enriched_message(message_str) + response = super().llm_response(enriched_message) + + if response: + self.store_conversation(message_str, response.content) + + return response + + +def setup_vecdb(docker: bool, reset: bool, username: Optional[str] = None) -> QdrantDBConfig: + """Configure vector database.""" + collection_name = "conversation_memory" + if username: + collection_name += f"_{username}" + + return QdrantDBConfig(collection_name=collection_name, replace_collection=reset, docker=docker) + + +def chat( + docker_vecdb: bool = False, + reset_memory: bool = False, + username: Optional[str] = "user", + init_message: str = "", +) -> None: + """Run the chatbot with memory.""" + print( + """ + [blue]Welcome to the enhanced chatbot with memory! + Enter x or q to quit + """ + ) + + def create_agent(username: Optional[str] = None) -> MemoryChatAgent: + vecdb_config = setup_vecdb(docker_vecdb, reset_memory, username) + + config = lr.ChatAgentConfig( + llm=lr.language_models.OpenAIGPTConfig( + chat_model=lr.language_models.OpenAIChatModel.GPT4, + ), + vecdb=vecdb_config, + ) + + agent = MemoryChatAgent(config=config, username=username) + # Update system message based on user + agent.set_system_message(agent.generate_system_prompt()) + return agent + + # Initial agent without user context + agent = create_agent(username=username) + + # Create task + task = lr.Task(agent, name="MemoryBot") + + # Run the task + task.run(msg=init_message or None) + + +@app.command() +def main( + debug: bool = typer.Option(False, "--debug", "-d", help="debug mode"), + no_stream: bool = typer.Option(False, "--nostream", "-ns", help="no streaming"), + nocache: bool = typer.Option(False, "--nocache", "-nc", help="don't use cache"), + docker: bool = typer.Option(True, "--docker", help="use docker for vector database"), + reset: bool = typer.Option(False, "--reset", help="reset conversation memory"), + username: str = typer.Option("user", "--user", "-u", help="user name"), + init_message: str = typer.Option("", "--msg", "-m", help="initial message"), +) -> None: + """Main app function.""" + lr.utils.configuration.set_global( + Settings( + debug=debug, + cache=not nocache, + stream=not no_stream, + ) + ) + chat(docker_vecdb=docker, reset_memory=reset, username=username, init_message=init_message) + + +if __name__ == "__main__": + app() diff --git a/examples/quick-start/memory-chat-agent/memory_doc_chat_agent.py b/examples/quick-start/memory-chat-agent/memory_doc_chat_agent.py new file mode 100644 index 0000000..e2bfa47 --- /dev/null +++ b/examples/quick-start/memory-chat-agent/memory_doc_chat_agent.py @@ -0,0 +1,207 @@ +""" +Enhanced chat agent that remembers conversations using RAG. + +Same functionalities as ConversationMemoryAgent but with the all the utility provided +by the DocChatAgent class. +In this case the "documents" are simply the past conversations stored in the vector database. + +Run as follows: +python3 examples/quick-start/memory-chat-agent/memory_doc_chat_agent.py -u {username} + +To test try 2 different run with the same username and different messages: + +run 1 + USER: I am a software engineer + LLM: {some response} + <> + +run 2 + USER: What do I do for a living? + LLM: {some response} should be related to software engineering +""" + +import typer +from rich import print +from typing import Callable, List, Optional, Tuple +from datetime import datetime +import langroid as lr +from langroid.utils.configuration import Settings +from langroid.vector_store.qdrantdb import QdrantDBConfig +from langroid.agent.special.doc_chat_agent import DocChatAgent, DocChatAgentConfig +from langroid.mytypes import Document, DocMetaData +from langroid.agent.chat_document import ChatDocument +from langroid.utils.constants import NO_ANSWER +from base_memory_chat_agent import BaseMemoryChatAgent + + +app = typer.Typer() + +lr.utils.logging.setup_colored_logging() + + +class ConversationMemoryDocAgent(DocChatAgent, BaseMemoryChatAgent): + """DocChatAgent with conversation memory capabilities.""" + + def __init__( + self, + config: DocChatAgentConfig, + username: Optional[str] = None, + ) -> None: + """Initialize ConversationMemoryDocAgent.""" + DocChatAgent.__init__(self, config) + BaseMemoryChatAgent.__init__(self, username) + self.set_system_message(self.generate_system_prompt()) + self.set_user_message("") + + def _create_conversation_document(self, message: str, response: str) -> Document: + """Create conversation document from message and response.""" + return Document( + content=f"User ({self.username}): {message}\nAssistant: {response}", + metadata=DocMetaData( + source="conversation", + sender_name=self.username, + timestamp=datetime.now().isoformat(), + conversation_type="dialogue", + ), + ) + + def _store_document(self, document: Document) -> None: + """Store document in vector database.""" + self.ingest_docs([document], split=True) + + def answer_from_docs(self, query: str) -> ChatDocument: + """ + Answer from documents with context. + + Override to include memory retrieval in response. + If storage is empty, return LLM response. + """ + answer = super().answer_from_docs(query) + if not answer or answer.content == NO_ANSWER: + return self.llm_response_messages(query) + return answer + + def llm_response( + self, + query: None | str | ChatDocument = None, + ) -> Optional[ChatDocument]: + """Override llm_response to include memory retrieval in response.""" + if not self.llm_can_respond(query): + return None + + message_str = query.content if isinstance(query, ChatDocument) else query + enriched_message = self._get_enriched_message(message_str) + response = super().llm_response(enriched_message) + + if response: + self.store_conversation(message_str, response.content) + + return response + + def entity_responders( + self, + ) -> List[Tuple[lr.Entity, Callable[[None | str | ChatDocument], None | ChatDocument]]]: + """We don't want to involve Agent responders in this case.""" + return [ + (lr.Entity.LLM, self.llm_response), + (lr.Entity.USER, self.user_response), + ] + + +def setup_vecdb(docker: bool, reset: bool, username: Optional[str] = None) -> QdrantDBConfig: + """Configure vector database.""" + collection_name = "conversation_memory" + if username: + collection_name += f"_{username}" + + return QdrantDBConfig(collection_name=collection_name, replace_collection=reset, docker=docker) + + +def chat( + docker_vecdb: bool = False, + reset_memory: bool = False, + username: Optional[str] = "user", + init_message: str = "", +) -> None: + """Run the chatbot with memory using DocChatAgent.""" + print( + """ + [blue]Welcome to the enhanced chatbot with memory! + Enter x or q to quit + """ + ) + + def create_agent(username: Optional[str] = None) -> ConversationMemoryDocAgent: + vecdb_config = setup_vecdb(docker_vecdb, reset_memory, username) + + config = DocChatAgentConfig( + llm=lr.language_models.OpenAIGPTConfig( + chat_model=lr.language_models.OpenAIChatModel.GPT4o_MINI, + ), + vecdb=vecdb_config, + # Enable hypothetical answers for better RAG + hypothetical_answer=True, + # Enable query rephrasing for better recall + n_query_rephrases=2, + # Enable various search methods + use_fuzzy_match=True, + use_bm25_search=True, + # Better ranking + use_reciprocal_rank_fusion=True, + # Add neighbors for better context + n_neighbor_chunks=1, + # Avoid redundancy in results + rerank_diversity=True, + # Handle long contexts better + rerank_periphery=True, + # Enable streaming for better UX + stream=True, + conversation_mode=True, + ) + + agent = ConversationMemoryDocAgent(config=config, username=username) + + # Check if collection is empty and handle appropriately + if agent.vecdb is None or agent.vecdb.config.collection_name not in agent.vecdb.list_collections(): + print("[yellow]Starting fresh conversation - no previous memory found.") + else: + print("[green]Found existing conversation history.") + + return agent + + # Create agent with user context + agent = create_agent(username=username) + + # Create task + task = lr.Task( + agent, + name="MemoryBot", + ) + + # Run the task + task.run(msg=init_message or None) + + +@app.command() +def main( + debug: bool = typer.Option(False, "--debug", "-d", help="debug mode"), + no_stream: bool = typer.Option(False, "--nostream", "-ns", help="no streaming"), + nocache: bool = typer.Option(False, "--nocache", "-nc", help="don't use cache"), + docker: bool = typer.Option(True, "--docker", help="use docker for vector database"), + reset: bool = typer.Option(False, "--reset", help="reset conversation memory"), + username: str = typer.Option("user", "--user", "-u", help="user name"), + init_message: str = typer.Option("", "--msg", "-m", help="initial message"), +) -> None: + """Main app function.""" + lr.utils.configuration.set_global( + Settings( + debug=debug, + cache=not nocache, + stream=not no_stream, + ) + ) + chat(docker_vecdb=docker, reset_memory=reset, username=username, init_message=init_message) + + +if __name__ == "__main__": + app()