diff --git a/autogen/agents/experimental/document_agent/document_agent.py b/autogen/agents/experimental/document_agent/document_agent.py
index 8515e1c25b3..9ee80538296 100644
--- a/autogen/agents/experimental/document_agent/document_agent.py
+++ b/autogen/agents/experimental/document_agent/document_agent.py
@@ -5,29 +5,21 @@
 import logging
 from copy import deepcopy
 from pathlib import Path
-from typing import Annotated, Any, cast
+from typing import Any, cast
 
 from pydantic import BaseModel, Field
 
 from .... import Agent, ConversableAgent, UpdateSystemMessage
 from ....agentchat.contrib.rag.query_engine import RAGQueryEngine
-from ....agentchat.group.context_condition import ExpressionContextCondition
-from ....agentchat.group.context_expression import ContextExpression
 from ....agentchat.group.context_variables import ContextVariables
-from ....agentchat.group.llm_condition import StringLLMCondition
 from ....agentchat.group.multi_agent_chat import initiate_group_chat
-from ....agentchat.group.on_condition import OnCondition
-from ....agentchat.group.on_context_condition import OnContextCondition
 from ....agentchat.group.patterns.pattern import DefaultPattern
-from ....agentchat.group.reply_result import ReplyResult
-from ....agentchat.group.targets.transition_target import AgentNameTarget, AgentTarget, StayTarget, TerminateTarget
+from ....agentchat.group.targets.transition_target import AgentTarget, TerminateTarget
 from ....doc_utils import export_module
 from ....llm_config import LLMConfig
-from ....oai.client import OpenAIWrapper
-from .chroma_query_engine import VectorChromaQueryEngine
-from .docling_doc_ingest_agent import DoclingDocIngestAgent
-from .document_conditions import SummaryTaskAvailableCondition
+from .chroma_query_engine import VectorChromaCitationQueryEngine, VectorChromaQueryEngine
 from .document_utils import Ingest, Query
+from .task_manager import TaskManagerAgent
 
 __all__ = ["DocAgent"]
 
@@ -38,55 +30,6 @@
     You are given a list of documents to ingest and a list of queries to perform.
     You are responsible for ingesting the documents and answering the queries.
 """
-TASK_MANAGER_NAME = "TaskManagerAgent"
-TASK_MANAGER_SYSTEM_MESSAGE = """
-    You are a task manager agent. You have 2 priorities:
-    1. You initiate the tasks which updates the context variables based on the task decisions (DocumentTask) from the DocumentTriageAgent.
-    ALWAYS call initiate_tasks first when you receive a message from the DocumentTriageAgent, even if you think there are no new tasks.
-    This ensures that any new ingestions or queries from the triage agent are properly recorded.
-    Put all ingestion and query tasks into the one tool call.
-        i.e. output
-        {
-            "ingestions": [
-                {
-                    "path_or_url": "path_or_url"
-                }
-            ],
-            "queries": [
-                {
-                    "query_type": "RAG_QUERY",
-                    "query": "query"
-                }
-            ],
-            "query_results": [
-                {
-                    "query": "query",
-                    "result": "result"
-                }
-            ]
-        }
-    2. If there are no documents to ingest and no queries to run, hand control off to the summary agent.
-
-    Put all file paths and URLs into the ingestions. A http/https URL is also a valid path and should be ingested.
-
-    Use the initiate_tasks tool to incorporate all ingestions and queries. Don't call it again until new ingestions or queries are raised.
-
-    New ingestions and queries may be raised from time to time, so use the initiate_tasks again if you see new ingestions/queries.
-
-    Transfer to the summary agent if all ingestion and query tasks are done.
-    """
-
-DEFAULT_ERROR_GROUP_CHAT_MESSAGE: str = """
-Document Agent failed to perform task.
-"""
-
-ERROR_MANAGER_NAME = "ErrorManagerAgent"
-ERROR_MANAGER_SYSTEM_MESSAGE = """
-You communicate errors to the user. Include the original error messages in full. Use the format:
-The following error(s) have occurred:
-- Error 1
-- Error 2
-"""
 
 
 class DocumentTask(BaseModel):
@@ -95,49 +38,34 @@ class DocumentTask(BaseModel):
     ingestions: list[Ingest] = Field(description="The list of documents to ingest.")
     queries: list[Query] = Field(description="The list of queries to perform.")
 
-    def format(self) -> str:
-        """Format the DocumentTask as a string for the TaskManager to work with."""
-        if len(self.ingestions) == 0 and len(self.queries) == 0:
-            return "There were no ingestion or query tasks detected."
-
-        instructions = "Tasks:\n\n"
-        order = 1
-
-        if len(self.ingestions) > 0:
-            instructions += "Ingestions:\n"
-            for ingestion in self.ingestions:
-                instructions += f"{order}: {ingestion.path_or_url}\n"
-                order += 1
-
-            instructions += "\n"
-
-        if len(self.queries) > 0:
-            instructions += "Queries:\n"
-            for query in self.queries:
-                instructions += f"{order}: {query.query}\n"
-                order += 1
-
-        return instructions
-
 
 class DocumentTriageAgent(ConversableAgent):
     """The DocumentTriageAgent is responsible for deciding what type of task to perform from user requests."""
 
-    def __init__(self, llm_config: LLMConfig | dict[str, Any] | None = None):
+    def __init__(self, llm_config: LLMConfig | dict[str, Any] | None = None, custom_system_message: str | None = None):
+        # Handle None config by getting current config
+        if llm_config is None:
+            llm_config = LLMConfig.get_current_llm_config()
+
         # Add the structured message to the LLM configuration
         structured_config_list = deepcopy(llm_config)
         structured_config_list["response_format"] = DocumentTask  # type: ignore[index]
 
+        # Use custom system message if provided, otherwise use default
+        default_system_message = (
+            "You are a document triage agent. "
+            "You are responsible for deciding what type of task to perform from a user's request and populating a DocumentTask formatted response. "
+            "If the user specifies files or URLs, add them as individual 'ingestions' to DocumentTask. "
+            "You can access external websites if given a URL, so put them in as ingestions. "
+            "Add the user's questions about the files/URLs as individual 'RAG_QUERY' queries to the 'query' list in the DocumentTask. "
+            "Don't make up questions, keep it as concise and close to the user's request as possible."
+        )
+
+        system_message = custom_system_message or default_system_message
+
         super().__init__(
             name="DocumentTriageAgent",
-            system_message=(
-                "You are a document triage agent. "
-                "You are responsible for deciding what type of task to perform from a user's request and populating a DocumentTask formatted response. "
-                "If the user specifies files or URLs, add them as individual 'ingestions' to DocumentTask. "
-                "You can access external websites if given a URL, so put them in as ingestions. "
-                "Add the user's questions about the files/URLs as individual 'RAG_QUERY' queries to the 'query' list in the DocumentTask. "
-                "Don't make up questions, keep it as concise and close to the user's request as possible."
-            ),
+            system_message=system_message,
             human_input_mode="NEVER",
             llm_config=structured_config_list,
         )
@@ -147,7 +75,10 @@ def __init__(self, llm_config: LLMConfig | dict[str, Any] | None = None):
 class DocAgent(ConversableAgent):
     """The DocAgent is responsible for ingest and querying documents.
 
-    Internally, it generates a group chat with a set of agents to ingest, query, and summarize.
+    Internally, it generates a group chat with a simplified set of agents:
+    - TriageAgent: Analyzes user requests and creates DocumentTask
+    - TaskManagerAgent: Processes documents and queries using integrated tools
+    - SummaryAgent: Provides final summary of all operations
     """
 
     def __init__(
@@ -158,36 +89,39 @@ def __init__(
         parsed_docs_path: str | Path | None = None,
         collection_name: str | None = None,
         query_engine: RAGQueryEngine | None = None,
+        enable_citations: bool = False,
+        citation_chunk_size: int = 512,
+        update_inner_agent_system_message: dict[str, Any] | None = None,
+        rag_config: dict[str, dict[str, Any]] | None = None,  # NEW: {"vector": {}, "graph": {...}}
     ):
         """Initialize the DocAgent.
 
         Args:
-            name (Optional[str]): The name of the DocAgent.
-            llm_config (Optional[LLMConfig, dict[str, Any]]): The configuration for the LLM.
-            system_message (Optional[str]): The system message for the DocAgent.
-            parsed_docs_path (Union[str, Path]): The path where parsed documents will be stored.
-            collection_name (Optional[str]): The unique name for the data store collection. If omitted, a random name will be used. Populate this to reuse previous ingested data.
-            query_engine (Optional[RAGQueryEngine]): The query engine to use for querying documents, defaults to VectorChromaQueryEngine if none provided.
-                                                     Use enable_query_citations and implement query_with_citations method to enable citation support. e.g. VectorChromaCitationQueryEngine
-
-        The DocAgent is responsible for generating a group of agents to solve a task.
-
-        The agents that the DocAgent generates are:
-        - Triage Agent: responsible for deciding what type of task to perform from user requests.
-        - Task Manager Agent: responsible for managing the tasks.
-        - Parser Agent: responsible for parsing the documents.
-        - Data Ingestion Agent: responsible for ingesting the documents.
-        - Query Agent: responsible for answering the user's questions.
-        - Error Agent: responsible for returning errors gracefully.
-        - Summary Agent: responsible for generating a summary of the user's questions.
+            name: The name of the DocAgent.
+            llm_config: The configuration for the LLM.
+            system_message: The system message for the DocAgent.
+            parsed_docs_path: The path where parsed documents will be stored.
+            collection_name: The unique name for the data store collection.
+            query_engine: The query engine to use for querying documents.
+            enable_citations: Whether to enable citation support in queries. Defaults to False.
+            citation_chunk_size: The size of chunks to use for citations. Defaults to 512.
+            update_inner_agent_system_message: Dictionary mapping inner agent names to custom system messages.
+                Keys: "DocumentTriageAgent", "TaskManagerAgent", "SummaryAgent"
+                Values: Custom system message strings for each agent.
+            rag_config: Configuration for RAG engines {"vector": {}, "graph": {...}}.
         """
         name = name or "DocAgent"
         llm_config = llm_config or LLMConfig.get_current_llm_config()
         system_message = system_message or DEFAULT_SYSTEM_MESSAGE
         parsed_docs_path = parsed_docs_path or "./parsed_docs"
+        update_inner_agent_system_message = update_inner_agent_system_message or {}
 
-        # Default Query Engine will be ChromaDB
-        if query_engine is None:
+        # Initialize query engine based on citation preference
+        if query_engine is None and enable_citations:
+            query_engine = VectorChromaCitationQueryEngine(
+                collection_name=collection_name, enable_query_citations=True, citation_chunk_size=citation_chunk_size
+            )
+        elif query_engine is None:
             query_engine = VectorChromaQueryEngine(collection_name=collection_name)
 
         super().__init__(
@@ -196,317 +130,57 @@ def __init__(
             llm_config=llm_config,
             human_input_mode="NEVER",
         )
-        self.register_reply([ConversableAgent, None], self.generate_inner_group_chat_reply, position=0)
-
-        self._triage_agent = DocumentTriageAgent(llm_config=llm_config)
-
-        def create_error_agent_prompt(agent: ConversableAgent, messages: list[dict[str, Any]]) -> str:
-            """Create the error agent prompt, primarily used to update ingested documents for ending.
-
-            Args:
-                agent: The conversable agent requesting the prompt
-                messages: List of conversation messages
-
-            Returns:
-                str: The error manager system message
-            """
-            update_ingested_documents()
-
-            return ERROR_MANAGER_SYSTEM_MESSAGE
-
-        self._error_agent = ConversableAgent(
-            name=ERROR_MANAGER_NAME,
-            system_message=ERROR_MANAGER_SYSTEM_MESSAGE,
-            llm_config=llm_config,
-            update_agent_state_before_reply=[UpdateSystemMessage(create_error_agent_prompt)],
-        )
-
-        def update_ingested_documents() -> None:
-            """Updates the list of ingested documents, persisted so we can keep a list over multiple replies.
-
-            This function updates self.documents_ingested with any new documents that have been ingested
-            by the triage agent, ensuring persistence across multiple DocAgent interactions.
-            """
-            agent_documents_ingested = self._triage_agent.context_variables.get("DocumentsIngested", [])
-            # Update self.documents_ingested with any new documents ingested
-            for doc in agent_documents_ingested:  # type: ignore[union-attr]
-                if doc not in self.documents_ingested:
-                    self.documents_ingested.append(doc)
-
-        class TaskInitInfo(BaseModel):
-            ingestions: Annotated[list[Ingest], Field(description="List of documents, files, and URLs to ingest")]
-            queries: Annotated[list[Query], Field(description="List of queries to run")]
-
-        def _deduplicate_ingestions(
-            new_ingestions: list[Ingest], existing_ingestions: list[Ingest], documents_ingested: list[str]
-        ) -> tuple[list[Ingest], list[str]]:
-            """Deduplicate ingestions against existing pending and already ingested documents.
-
-            Args:
-                new_ingestions: List of new ingestion requests to process
-                existing_ingestions: List of ingestions already pending
-                documents_ingested: List of document paths already ingested
-
-            Returns:
-                tuple: (new_unique_ingestions, ignored_duplicate_paths)
-            """
-            unique_ingestions = []
-            ignored_paths = []
-
-            for ingestion in new_ingestions:
-                ingestion_path = ingestion.path_or_url
-                # Check if already in pending ingestions
-                already_pending = any(existing.path_or_url == ingestion_path for existing in existing_ingestions)
-                # Check if already ingested
-                already_ingested = ingestion_path in documents_ingested
-
-                if already_pending or already_ingested:
-                    ignored_paths.append(ingestion_path)
-                else:
-                    unique_ingestions.append(ingestion)
-
-            return unique_ingestions, ignored_paths
-
-        def _deduplicate_queries(
-            new_queries: list[Query], existing_queries: list[Query]
-        ) -> tuple[list[Query], list[str]]:
-            """Deduplicate queries against existing pending queries.
-
-            Args:
-                new_queries: List of new query requests to process
-                existing_queries: List of queries already pending
-
-            Returns:
-                tuple: (new_unique_queries, ignored_duplicate_query_texts)
-            """
-            unique_queries = []
-            ignored_query_texts = []
-
-            for query in new_queries:
-                query_text = query.query
-                # Check if query already exists in pending queries
-                already_pending = any(existing.query == query_text for existing in existing_queries)
-
-                if already_pending:
-                    ignored_query_texts.append(query_text)
-                else:
-                    unique_queries.append(query)
-
-            return unique_queries, ignored_query_texts
-
-        def _build_response_message(
-            added_ingestions: int, ignored_ingestions: list[str], added_queries: int, ignored_queries: list[str]
-        ) -> str:
-            """Build a descriptive response message about what was added/ignored.
-
-            Args:
-                added_ingestions: Number of unique ingestions added
-                ignored_ingestions: List of duplicate ingestion paths ignored
-                added_queries: Number of unique queries added
-                ignored_queries: List of duplicate query texts ignored
-
-            Returns:
-                str: Formatted message describing the results
-            """
-            messages = []
-
-            if added_ingestions > 0:
-                messages.append(f"Added {added_ingestions} new document(s) for ingestion")
-
-            if ignored_ingestions:
-                messages.append(
-                    f"Ignored {len(ignored_ingestions)} duplicate document(s): {', '.join(ignored_ingestions)}"
-                )
-
-            if added_queries > 0:
-                messages.append(f"Added {added_queries} new query/queries")
-
-            if ignored_queries:
-                messages.append(f"Ignored {len(ignored_queries)} duplicate query/queries: {', '.join(ignored_queries)}")
-
-            if messages:
-                return "; ".join(messages)
-            else:
-                return "All requested tasks were duplicates and ignored"
-
-        def initiate_tasks(
-            task_init_info: Annotated[TaskInitInfo, "Documents, Files, URLs to ingest and the queries to run"],
-            context_variables: Annotated[ContextVariables, "Context variables"],
-        ) -> ReplyResult:
-            """Add documents to ingest and queries to answer when received.
-
-            Args:
-                task_init_info: Information about documents to ingest and queries to run
-                context_variables: The current context variables containing task state
-
-            Returns:
-                ReplyResult: Contains response message, updated context, and target agent
-            """
-            ingestions = task_init_info.ingestions
-            queries = task_init_info.queries
-
-            if "TaskInitiated" in context_variables:
-                # Handle follow-up tasks with deduplication
-                added_ingestions_count = 0
-                ignored_ingestions = []
-                added_queries_count = 0
-                ignored_queries = []
-
-                if ingestions:
-                    existing_ingestions: list[Ingest] = context_variables.get("DocumentsToIngest", [])  # type: ignore[assignment]
-                    documents_ingested: list[str] = context_variables.get("DocumentsIngested", [])  # type: ignore[assignment]
-
-                    unique_ingestions, ignored_ingestion_paths = _deduplicate_ingestions(
-                        ingestions, existing_ingestions, documents_ingested
-                    )
-
-                    if unique_ingestions:
-                        context_variables["DocumentsToIngest"] = existing_ingestions + unique_ingestions
-                        added_ingestions_count = len(unique_ingestions)
-
-                    ignored_ingestions = ignored_ingestion_paths
-
-                if queries:
-                    existing_queries: list[Query] = context_variables.get("QueriesToRun", [])  # type: ignore[assignment]
+        self.register_reply([ConversableAgent, None], DocAgent.generate_inner_group_chat_reply, position=0)
 
-                    unique_queries, ignored_query_texts = _deduplicate_queries(queries, existing_queries)
-
-                    if unique_queries:
-                        context_variables["QueriesToRun"] = existing_queries + unique_queries
-                        added_queries_count = len(unique_queries)
-
-                    ignored_queries = ignored_query_texts
-
-                if not ingestions and not queries:
-                    return ReplyResult(message="No new tasks to initiate", context_variables=context_variables)
-
-                response_message = _build_response_message(
-                    added_ingestions_count, ignored_ingestions, added_queries_count, ignored_queries
-                )
-
-            else:
-                # First time initialization - no deduplication needed
-                context_variables["DocumentsToIngest"] = ingestions
-                context_variables["QueriesToRun"] = list(queries)
-                context_variables["TaskInitiated"] = True
-                response_message = "Updated context variables with task decisions"
-
-            return ReplyResult(
-                message=response_message,
-                context_variables=context_variables,
-                target=AgentNameTarget(agent_name=TASK_MANAGER_NAME),
-            )
-
-        self._task_manager_agent = ConversableAgent(
-            name=TASK_MANAGER_NAME,
-            system_message=TASK_MANAGER_SYSTEM_MESSAGE,
-            llm_config=llm_config,
-            functions=[initiate_tasks],
+        # Initialize agents with custom system messages if provided
+        self._triage_agent = DocumentTriageAgent(
+            llm_config=llm_config, custom_system_message=update_inner_agent_system_message.get("DocumentTriageAgent")
         )
 
-        self._triage_agent.handoffs.set_after_work(target=AgentTarget(agent=self._task_manager_agent))
-
-        self._data_ingestion_agent = DoclingDocIngestAgent(
+        self._task_manager_agent = TaskManagerAgent(
             llm_config=llm_config,
             query_engine=query_engine,
             parsed_docs_path=parsed_docs_path,
-            return_agent_success=TASK_MANAGER_NAME,
-            return_agent_error=ERROR_MANAGER_NAME,
+            collection_name=collection_name,
+            custom_system_message=update_inner_agent_system_message.get("TaskManagerAgent"),
+            rag_config=rag_config,  # NEW
         )
 
-        def execute_rag_query(context_variables: ContextVariables) -> ReplyResult:  # type: ignore[type-arg]
-            """Execute outstanding RAG queries, call the tool once for each outstanding query. Call this tool with no arguments.
-
-            Args:
-                context_variables: The current context variables containing queries to run
-
-            Returns:
-                ReplyResult: Contains query answer, updated context, and target agent
-            """
-            if len(context_variables["QueriesToRun"]) == 0:
-                return ReplyResult(
-                    target=AgentNameTarget(agent_name=TASK_MANAGER_NAME),
-                    message="No queries to run",
-                    context_variables=context_variables,
-                )
-
-            query = context_variables["QueriesToRun"][0].query
-            try:
-                if (
-                    hasattr(query_engine, "enable_query_citations")
-                    and query_engine.enable_query_citations
-                    and hasattr(query_engine, "query_with_citations")
-                    and callable(query_engine.query_with_citations)
-                ):
-                    answer_with_citations = query_engine.query_with_citations(query)  # type: ignore[union-attr]
-                    answer = answer_with_citations.answer
-                    txt_citations = [
-                        {
-                            "text_chunk": source.node.get_text(),
-                            "file_path": source.metadata["file_path"],
-                        }
-                        for source in answer_with_citations.citations
-                    ]
-                    logger.info(f"Citations:\n {txt_citations}")
-                else:
-                    answer = query_engine.query(query)
-                    txt_citations = []
-                context_variables["QueriesToRun"].pop(0)
-                context_variables["CompletedTaskCount"] += 1
-                context_variables["QueryResults"].append({"query": query, "answer": answer, "citations": txt_citations})
-
-                # Query completed
-
-                return ReplyResult(message=answer, context_variables=context_variables)
-            except Exception as e:
-                return ReplyResult(
-                    target=AgentNameTarget(agent_name=ERROR_MANAGER_NAME),
-                    message=f"Query failed for '{query}': {e}",
-                    context_variables=context_variables,
-                )
-
-        self._query_agent = ConversableAgent(
-            name="QueryAgent",
-            system_message="""
-            You are a query agent.
-            You answer the user's questions only using the query function provided to you.
-            You can only call use the execute_rag_query tool once per turn.
-            """,
-            llm_config=llm_config,
-            functions=[execute_rag_query],
-        )
+        def update_ingested_documents() -> None:
+            """Updates the list of ingested documents for persistence across interactions."""
+            if hasattr(self._triage_agent, "context_variables"):
+                agent_documents_ingested = self._triage_agent.context_variables.get("DocumentsIngested", [])
+                if agent_documents_ingested is not None:
+                    for doc in agent_documents_ingested:
+                        if doc not in self.documents_ingested:
+                            self.documents_ingested.append(doc)
 
-        # Summary agent prompt will include the results of the ingestions and queries
         def create_summary_agent_prompt(agent: ConversableAgent, messages: list[dict[str, Any]]) -> str:
-            """Create the summary agent prompt and updates ingested documents.
-
-            Args:
-                agent: The conversable agent requesting the prompt
-                messages: List of conversation messages
-
-            Returns:
-                str: The summary agent system message with context information
-            """
+            """Create the summary agent prompt with context information."""
             update_ingested_documents()
 
-            documents_to_ingest: list[Ingest] = cast(list[Ingest], agent.context_variables.get("DocumentsToIngest", []))
-            queries_to_run: list[Query] = cast(list[Query], agent.context_variables.get("QueriesToRun", []))
+            # Safe type casting with defaults
+            query_results = cast(list[dict[str, Any]], agent.context_variables.get("QueryResults", []))
+            documents_ingested = cast(list[str], agent.context_variables.get("DocumentsIngested", []))
+            documents_to_ingest = cast(list[Ingest], agent.context_variables.get("DocumentsToIngest", []))
+            queries_to_run = cast(list[Query], agent.context_variables.get("QueriesToRun", []))
 
             system_message = (
                 "You are a summary agent and you provide a summary of all completed tasks and the list of queries and their answers. "
                 "Output two sections: 'Ingestions:' and 'Queries:' with the results of the tasks. Number the ingestions and queries. "
                 "If there are no ingestions output 'No ingestions', if there are no queries output 'No queries' under their respective sections. "
                 "Don't add markdown formatting. "
-                "For each query, there is one answer and, optionally, a list of citations."
-                "For each citation, it contains two fields: 'text_chunk' and 'file_path'."
-                "Format the Query and Answers and Citations as 'Query:\nAnswer:\n\nCitations:'. Add a number to each query if more than one. Use the context below:\n"
-                "For each query, output the full citation contents and list them one by one,"
-                "format each citation as '\nSource [X] (chunk file_path here):\n\nChunk X:\n(text_chunk here)' and mark a separator between each citation using '\n#########################\n\n'."
-                "If there are no citations at all, DON'T INCLUDE ANY mention of citations.\n"
-                f"Documents ingested: {documents_to_ingest}\n"
-                f"Documents left to ingest: {len(documents_to_ingest)}\n"
-                f"Queries left to run: {len(queries_to_run)}\n"
-                f"Query and Answers and Citations: {queries_to_run}\n"
+                "For each query, there is one answer and, optionally, a list of citations. "
+                "For each citation, it contains two fields: 'text_chunk' and 'file_path'. "
+                "Format the Query and Answers and Citations as 'Query:\\nAnswer:\\n\\nCitations:'. Add a number to each query if more than one. Use the context below:\\n"
+                "For each query, output the full citation contents and list them one by one, "
+                "format each citation as '\\nSource [X] (chunk file_path here):\\n\\nChunk X:\\n(text_chunk here)' and mark a separator between each citation using '\\n#########################\\n\\n'. "
+                "If there are no citations at all, DON'T INCLUDE ANY mention of citations. "
+                "Include any errors that occurred during processing.\\n"
+                f"Documents ingested: {documents_ingested}\\n"
+                f"Documents left to ingest: {len(documents_to_ingest)}\\n"
+                f"Queries left to run: {len(queries_to_run)}\\n"
+                f"Query Results: {query_results}\\n"
             )
 
             return system_message
@@ -514,124 +188,118 @@ def create_summary_agent_prompt(agent: ConversableAgent, messages: list[dict[str
         self._summary_agent = ConversableAgent(
             name="SummaryAgent",
             llm_config=llm_config,
+            system_message=update_inner_agent_system_message.get("SummaryAgent"),
             update_agent_state_before_reply=[UpdateSystemMessage(create_summary_agent_prompt)],
         )
 
-        self._task_manager_agent.register_handoffs([
-            OnContextCondition(  # Go straight to data ingestion agent if we have documents to ingest
-                target=AgentTarget(agent=self._data_ingestion_agent),
-                condition=ExpressionContextCondition(
-                    expression=ContextExpression(expression="len(${DocumentsToIngest}) > 0")
-                ),
-            ),
-            OnContextCondition(  # Go to Query agent if we have queries to run (ingestion above run first)
-                target=AgentTarget(agent=self._query_agent),
-                condition=ExpressionContextCondition(
-                    expression=ContextExpression(expression="len(${QueriesToRun}) > 0")
-                ),
-            ),
-            # Removed automatic context condition - let task manager decide when to summarize
-            OnCondition(
-                target=AgentTarget(agent=self._summary_agent),
-                condition=StringLLMCondition(
-                    prompt="Call this function if all work is done and a summary will be created"
-                ),
-                available=SummaryTaskAvailableCondition(),  # Custom AvailableCondition class
-            ),
-        ])
-        self._task_manager_agent.handoffs.set_after_work(target=StayTarget())
-
-        self._data_ingestion_agent.handoffs.set_after_work(target=AgentTarget(agent=self._task_manager_agent))
-
-        self._query_agent.handoffs.set_after_work(target=AgentTarget(agent=self._task_manager_agent))
-
-        # Summary agent terminates the DocumentAgent
+        # Set up handoffs - simplified flow
+        self._triage_agent.handoffs.set_after_work(target=AgentTarget(agent=self._task_manager_agent))
+        self._task_manager_agent.handoffs.set_after_work(target=AgentTarget(agent=self._summary_agent))
         self._summary_agent.handoffs.set_after_work(target=TerminateTarget())
 
-        # The Error Agent always terminates the DocumentAgent
-        self._error_agent.handoffs.set_after_work(target=TerminateTarget())
-
-        self.register_reply([Agent, None], DocAgent.generate_inner_group_chat_reply)
-
         self.documents_ingested: list[str] = []
         self._group_chat_context_variables: ContextVariables | None = None
 
     def generate_inner_group_chat_reply(
         self,
-        messages: list[dict[str, Any]] | str | None = None,
+        messages: list[dict[str, Any]] | None = None,
         sender: Agent | None = None,
-        config: OpenAIWrapper | None = None,
+        config: Any = None,
     ) -> tuple[bool, str | dict[str, Any] | None]:
-        """Reply function that generates the inner group chat reply for the DocAgent.
+        """Reply function that generates the inner group chat reply for the DocAgent."""
+        try:
+            # Initialize or reuse context variables
+            if hasattr(self, "_group_chat_context_variables") and self._group_chat_context_variables is not None:
+                context_variables = self._group_chat_context_variables
+                # Reset pending tasks for new run
+                context_variables["DocumentsToIngest"] = []
+            else:
+                context_variables = ContextVariables(
+                    data={
+                        "CompletedTaskCount": 0,
+                        "DocumentsToIngest": [],
+                        "DocumentsIngested": self.documents_ingested,
+                        "QueriesToRun": [],
+                        "QueryResults": [],
+                    }
+                )
+                self._group_chat_context_variables = context_variables
 
-        Args:
-            messages: Input messages to process
-            sender: The agent that sent the message
-            config: OpenAI wrapper configuration
+            if messages and len(messages) > 0:
+                last_message = messages[-1]
+                if (
+                    isinstance(last_message, dict)
+                    and last_message.get("name") == "DocumentTriageAgent"
+                    and "content" in last_message
+                    and isinstance(last_message["content"], str)
+                ):
+                    try:
+                        import json
+
+                        document_task_data = json.loads(last_message["content"])
+
+                        # Extract ingestions and queries
+                        ingestions = [Ingest(**ing) for ing in document_task_data.get("ingestions", [])]
+                        queries = [Query(**q) for q in document_task_data.get("queries", [])]
+
+                        # Update context variables with new tasks
+                        existing_ingestions = context_variables.get("DocumentsToIngest", []) or []
+                        existing_queries = context_variables.get("QueriesToRun", []) or []
+                        documents_ingested = context_variables.get("DocumentsIngested", []) or []
+
+                        # Deduplicate and add new ingestions
+                        for ingestion in ingestions:
+                            if (
+                                ingestion.path_or_url not in [ing.path_or_url for ing in existing_ingestions]
+                                and ingestion.path_or_url not in documents_ingested
+                            ):
+                                existing_ingestions.append(ingestion)
+
+                        # Deduplicate and add new queries
+                        for query in queries:
+                            if query.query not in [q.query for q in existing_queries]:
+                                existing_queries.append(query)
+
+                        context_variables["DocumentsToIngest"] = existing_ingestions
+                        context_variables["QueriesToRun"] = existing_queries
+                        context_variables["TaskInitiated"] = True
+
+                        logger.info(f"Processed triage output: {len(ingestions)} ingestions, {len(queries)} queries")
+
+                    except json.JSONDecodeError as e:
+                        logger.warning(f"Failed to parse triage output JSON: {e}")
+                    except Exception as e:
+                        logger.warning(f"Failed to process triage output: {e}")
+
+            group_chat_agents = [
+                self._triage_agent,
+                self._task_manager_agent,
+                self._summary_agent,
+            ]
 
-        Returns:
-            tuple: (should_terminate, reply_message)
-        """
-        # Use existing context_variables if available, otherwise create new ones
-        if hasattr(self, "_group_chat_context_variables") and self._group_chat_context_variables is not None:
-            context_variables = self._group_chat_context_variables
-            # Reset for the new run
-            context_variables["DocumentsToIngest"] = []  # type: ignore[index]
-        else:
-            context_variables = ContextVariables(
-                data={
-                    "CompletedTaskCount": 0,
-                    "DocumentsToIngest": [],
-                    "DocumentsIngested": self.documents_ingested,
-                    "QueriesToRun": [],
-                    "QueryResults": [],
-                }
+            agent_pattern = DefaultPattern(
+                initial_agent=self._triage_agent,
+                agents=group_chat_agents,
+                context_variables=context_variables,
+                group_after_work=TerminateTarget(),
             )
-            self._group_chat_context_variables = context_variables
-
-        group_chat_agents = [
-            self._triage_agent,
-            self._task_manager_agent,
-            self._data_ingestion_agent,
-            self._query_agent,
-            self._summary_agent,
-            self._error_agent,
-        ]
-
-        agent_pattern = DefaultPattern(
-            initial_agent=self._triage_agent,
-            agents=group_chat_agents,
-            context_variables=context_variables,
-            group_after_work=TerminateTarget(),
-        )
-
-        chat_result, context_variables, last_speaker = initiate_group_chat(
-            pattern=agent_pattern,
-            messages=self._get_document_input_message(messages),
-        )
-        if last_speaker == self._error_agent:
-            # If we finish with the error agent, we return their message which contains the error
-            return True, chat_result.summary
-        if last_speaker != self._summary_agent:
-            # If the group chat finished but not with the summary agent, we assume something has gone wrong with the flow
-            return True, DEFAULT_ERROR_GROUP_CHAT_MESSAGE
 
-        return True, chat_result.summary
-
-    def _get_document_input_message(self, messages: list[dict[str, Any]] | str | None) -> str:  # type: ignore[type-arg]
-        """Gets and validates the input message(s) for the document agent.
+            chat_result, context_variables, last_speaker = initiate_group_chat(
+                pattern=agent_pattern,
+                messages=self._get_document_input_message(messages),
+            )
 
-        Args:
-            messages: Input messages as string or list of message dictionaries
+            # Always return the final result since we only have summary termination
+            return True, chat_result.summary
 
-        Returns:
-            str: The extracted message content
+        except Exception as e:
+            logger.error(f"Critical error in DocAgent group chat: {e}")
+            return True, f"Error processing request: {str(e)}"
 
-        Raises:
-            NotImplementedError: If messages format is invalid
-        """
-        if isinstance(messages, str):
-            return messages
+    def _get_document_input_message(self, messages: list[dict[str, Any]] | None) -> str:
+        """Gets and validates the input message(s) for the document agent."""
+        if messages is None or len(messages) == 0:
+            return ""
         elif (
             isinstance(messages, list)
             and len(messages) > 0
@@ -640,4 +308,4 @@ def _get_document_input_message(self, messages: list[dict[str, Any]] | str | Non
         ):
             return messages[-1]["content"]
         else:
-            raise NotImplementedError("Invalid messages format. Must be a list of messages or a string.")
+            raise NotImplementedError("Invalid messages format. Must be a list of messages.")
diff --git a/autogen/agents/experimental/document_agent/parser_utils.py b/autogen/agents/experimental/document_agent/parser_utils.py
index 5464edefc23..60857196305 100644
--- a/autogen/agents/experimental/document_agent/parser_utils.py
+++ b/autogen/agents/experimental/document_agent/parser_utils.py
@@ -22,6 +22,17 @@
     )
     from docling.document_converter import DocumentConverter, PdfFormatOption
 
+    # Import with fallback for potentially unexported attributes
+    try:
+        from docling.datamodel.pipeline_options import (  # type: ignore[attr-defined]
+            AcceleratorDevice,
+            AcceleratorOptions,
+        )
+    except ImportError:
+        # Fallback: define these as Any if not available
+        AcceleratorDevice = None  # type: ignore
+        AcceleratorOptions = None  # type: ignore
+
 __all__ = ["docling_parse_docs"]
 
 logger = logging.getLogger(__name__)
@@ -83,7 +94,9 @@ def docling_parse_docs(  # type: ignore[no-any-unimported]
     pdf_pipeline_options.do_table_structure = True
     pdf_pipeline_options.table_structure_options.do_cell_matching = True
     pdf_pipeline_options.ocr_options.lang = ["en"]
-    pdf_pipeline_options.accelerator_options = AcceleratorOptions(num_threads=4, device=AcceleratorDevice.AUTO)
+    # Use defensive programming for potentially missing attributes
+    if AcceleratorOptions is not None and AcceleratorDevice is not None:
+        pdf_pipeline_options.accelerator_options = AcceleratorOptions(num_threads=4, device=AcceleratorDevice.AUTO)
 
     doc_converter = DocumentConverter(
         format_options={
diff --git a/autogen/agents/experimental/document_agent/task_manager.py b/autogen/agents/experimental/document_agent/task_manager.py
new file mode 100644
index 00000000000..079c65a4a84
--- /dev/null
+++ b/autogen/agents/experimental/document_agent/task_manager.py
@@ -0,0 +1,373 @@
+# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import asyncio
+import logging
+import threading
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Any
+
+from autogen import ConversableAgent
+from autogen.agentchat.contrib.rag.query_engine import RAGQueryEngine
+from autogen.agentchat.group.context_variables import ContextVariables
+from autogen.agentchat.group.reply_result import ReplyResult
+from autogen.agents.experimental.document_agent.chroma_query_engine import VectorChromaQueryEngine
+from autogen.agents.experimental.document_agent.task_manager_utils import (
+    execute_single_query,
+    process_single_document,
+)
+from autogen.doc_utils import export_module
+from autogen.llm_config import LLMConfig
+
+__all__ = ["TaskManagerAgent"]
+
+logger = logging.getLogger(__name__)
+
+TASK_MANAGER_SYSTEM_MESSAGE = """
+You are task manager agent responsible for processing document ingestion and query tasks.
+
+# INSTRUCTIONS:
+1) You are provided provided with tools use those tools to process the tasks.
+2) ingest_documents: (tool) For processing document ingestion tasks (takes list of paths/URLs)
+3) execute_query: (tool) For answering queries using the RAG system (takes list of query strings)
+
+# TASK FLOW (examples):
+1)  Query: "Please ingest this PDF file: /path/to/document.pdf"
+ TaskManager → ingest_documents -> summary agent
+2) Query: "What is machine learning?"
+ TaskManager → execute_query -> summary agent
+3) Query: "ingest document A.pdf, What is machine learning?"
+ TaskManager → ingest_documents -> execute_query -> summary agent
+
+# IMPORTANT:
+- Strictly follow the Instruction.
+- Use Task flows as reference for tool call Execution.
+"""
+
+
+@export_module("autogen.agents.experimental")
+class TaskManagerAgent(ConversableAgent):
+    """TaskManagerAgent with integrated tools for document ingestion and query processing."""
+
+    def __init__(
+        self,
+        name: str = "TaskManagerAgent",
+        llm_config: LLMConfig | dict[str, Any] | None = None,
+        query_engine: RAGQueryEngine | None = None,
+        parsed_docs_path: Path | str | None = None,
+        return_agent_success: str = "TaskManagerAgent",
+        return_agent_error: str = "SummaryAgent",
+        collection_name: str | None = None,
+        max_workers: int | None = None,
+        custom_system_message: str | None = None,
+        rag_config: dict[str, dict[str, Any]] | None = None,
+    ):
+        """Initialize the TaskManagerAgent.
+
+        Args:
+            name: The name of the agent
+            llm_config: The configuration for the LLM
+            query_engine: The RAG query engine for document operations
+            parsed_docs_path: Path where parsed documents will be stored
+            return_agent_success: The agent to return on successful completion of the task
+            return_agent_error: The agent to return on error
+            collection_name: The collection name for the RAG query engine
+            max_workers: Maximum number of threads for concurrent processing (None for default)
+            custom_system_message: Custom system message for the TaskManagerAgent
+            rag_config: Configuration for RAG engines {"vector": {}, "graph": {...}}
+        """
+        self.rag_config = rag_config or {"vector": {}}  # Default to vector only
+        self.parsed_docs_path = Path(parsed_docs_path) if parsed_docs_path else Path("./parsed_docs")
+        self.executor = ThreadPoolExecutor(max_workers=max_workers)
+        self._temp_citations_store: dict[str, list[dict[str, str]]] = {}
+        self._context_lock = threading.Lock()
+
+        # Initialize RAG engines
+        self.rag_engines = self._create_rag_engines(collection_name)
+
+        # Keep backward compatibility
+        self.query_engine = query_engine if query_engine else self.rag_engines.get("vector")
+
+        async def ingest_documents(
+            documents_to_ingest: list[str], context_variables: ContextVariables
+        ) -> ReplyResult | str:
+            """Ingest documents from the provided list using concurrent processing.
+
+            Args:
+                documents_to_ingest: List of document paths or URLs to ingest
+                context_variables: Context variables to store ingestion state
+
+            Returns:
+                str: Status message about the ingestion process
+            """
+            # Add input validation
+            if not documents_to_ingest:
+                return ReplyResult(
+                    message="No documents provided for ingestion",
+                    context_variables=context_variables,
+                )
+
+            # Validate document paths/URLs
+            valid_documents = []
+            for doc_path in documents_to_ingest:
+                if isinstance(doc_path, str) and doc_path.strip():
+                    valid_documents.append(doc_path.strip())
+                else:
+                    logger.warning(f"Invalid document path: {doc_path}")
+
+            if not valid_documents:
+                return ReplyResult(
+                    message="No valid documents found for ingestion",
+                    context_variables=context_variables,
+                )
+
+            # Safely handle context variable initialization
+            if "DocumentsToIngest" not in context_variables:
+                context_variables["DocumentsToIngest"] = []
+            if "DocumentsIngested" not in context_variables:
+                context_variables["DocumentsIngested"] = []
+            if "CompletedTaskCount" not in context_variables:
+                context_variables["CompletedTaskCount"] = 0
+            if "QueriesToRun" not in context_variables:
+                context_variables["QueriesToRun"] = []
+
+            # Add current batch to pending ingestions
+            context_variables["DocumentsToIngest"].append(valid_documents)
+
+            try:
+                # Process documents concurrently using ThreadPoolExecutor
+                loop = asyncio.get_event_loop()
+                futures = [
+                    loop.run_in_executor(
+                        self.executor,
+                        process_single_document,
+                        doc_path,
+                        self.parsed_docs_path,
+                        self.rag_config,
+                        self.rag_engines,
+                    )
+                    for doc_path in valid_documents
+                ]
+
+                # Wait for all documents to be processed
+                results = await asyncio.gather(*futures, return_exceptions=True)
+
+                successfully_ingested = []
+                for result in results:
+                    if isinstance(result, Exception):
+                        logger.warning(f"Document processing failed with exception: {result}")
+                        continue
+
+                    # Type check to ensure result is the expected tuple
+                    if isinstance(result, tuple) and len(result) == 3:
+                        doc_path, success, error_msg = result
+                        if success:
+                            successfully_ingested.append(doc_path)
+                        else:
+                            logger.warning(f"Failed to ingest document {doc_path}: {error_msg}")
+                    else:
+                        logger.warning(f"Unexpected result format: {result}")
+
+                # Enhanced logging with agent and tool title
+                logger.info("=" * 80)
+                logger.info("TOOL: ingest_documents (CONCURRENT)")
+                logger.info("AGENT: TaskManagerAgent")
+                logger.info(f"DOCUMENTS: {valid_documents}")
+                logger.info(f"SUCCESSFULLY INGESTED: {successfully_ingested}")
+                logger.info("=" * 80)
+
+                # Update context variables with successful ingestions
+                if successfully_ingested:
+                    context_variables["DocumentsIngested"].append(successfully_ingested)
+                    context_variables["CompletedTaskCount"] += 1
+
+                # Clear processed tasks
+                context_variables["DocumentsToIngest"] = []
+                context_variables["QueriesToRun"] = []
+
+                return ReplyResult(
+                    message=f"Documents ingested successfully: {successfully_ingested}",
+                    context_variables=context_variables,
+                )
+
+            except Exception as e:
+                # Enhanced error logging
+                logger.error("=" * 80)
+                logger.error("TOOL ERROR: ingest_documents (CONCURRENT)")
+                logger.error("AGENT: TaskManagerAgent")
+                logger.error(f"ERROR: {e}")
+                logger.error(f"DOCUMENTS: {valid_documents}")
+                logger.error("=" * 80)
+
+                # Preserve failed documents for retry
+                context_variables["DocumentsToIngest"] = [valid_documents]
+                return ReplyResult(
+                    message=f"Documents ingestion failed: {e}",
+                    context_variables=context_variables,
+                )
+
+        async def execute_query(queries_to_run: list[str], context_variables: ContextVariables) -> ReplyResult | str:
+            """Execute queries from the provided list using concurrent processing.
+
+            Args:
+                queries_to_run: List of queries to execute
+                context_variables: Context variables to store query state
+
+            Returns:
+                str: The answers to the queries or error message
+            """
+            if not queries_to_run:
+                return "No queries to run"
+
+            # Validate queries
+            valid_queries = [q.strip() for q in queries_to_run if isinstance(q, str) and q.strip()]
+            if not valid_queries:
+                return "No valid queries provided"
+
+            # Safely handle context variable initialization
+            if "QueriesToRun" not in context_variables:
+                context_variables["QueriesToRun"] = []
+            if "CompletedTaskCount" not in context_variables:
+                context_variables["CompletedTaskCount"] = 0
+            if "QueryResults" not in context_variables:
+                context_variables["QueryResults"] = []
+            if "Citations" not in context_variables:
+                context_variables["Citations"] = []
+
+            # Add current batch to pending queries
+            context_variables["QueriesToRun"].append(valid_queries)
+
+            try:
+                # Clear temporary citations store before processing
+                self._temp_citations_store = {}
+
+                # Process queries concurrently using ThreadPoolExecutor
+                loop = asyncio.get_event_loop()
+                futures = [
+                    loop.run_in_executor(self.executor, execute_single_query, query, self.rag_config, self.rag_engines)
+                    for query in valid_queries
+                ]
+
+                # Wait for all queries to be processed
+                results = await asyncio.gather(*futures, return_exceptions=True)
+
+                answers = []
+                all_citations: list[list[dict[str, str]] | None] = []
+
+                for i, result in enumerate(results):
+                    if isinstance(result, Exception):
+                        logger.warning(f"Query processing failed with exception: {result}")
+                        answers.append(f"Query processing failed: {result}")
+                        all_citations.append(None)
+                        continue
+
+                    # Type check to ensure result is the expected tuple
+                    if isinstance(result, tuple) and len(result) == 2:
+                        query_text, answer = result
+                        answers.append(answer)
+
+                        # Get citations from temporary store if available
+                        citations = self._temp_citations_store.get(query_text, None)
+                        all_citations.append(citations)
+                    else:
+                        logger.warning(f"Unexpected result format: {result}")
+                        answers.append(f"Unexpected result format: {result}")
+                        all_citations.append(None)
+
+                # Enhanced logging with agent and tool title
+                logger.info("=" * 80)
+                logger.info("TOOL: execute_query (CONCURRENT)")
+                logger.info("AGENT: TaskManagerAgent")
+                logger.info(f"QUERIES: {valid_queries}")
+                logger.info("=" * 80)
+
+                # Update context variables
+                context_variables["QueriesToRun"].pop(0)  # Remove processed batch
+                context_variables["CompletedTaskCount"] += 1
+
+                # Store query results with citations
+                query_result = {"query": valid_queries, "answer": answers, "citations": all_citations}
+                context_variables["QueryResults"].append(query_result)
+                # Clear temporary citations store after processing
+                self._temp_citations_store = {}
+
+                return ReplyResult(
+                    message="\n\n".join(answers),
+                    context_variables=context_variables,
+                )
+
+            except Exception as e:
+                error_msg = f"Query failed for queries '{valid_queries}': {str(e)}"
+
+                # Enhanced error logging
+                logger.error("=" * 80)
+                logger.error("TOOL ERROR: execute_query (CONCURRENT)")
+                logger.error("AGENT: TaskManagerAgent")
+                logger.error(f"QUERIES: {valid_queries}")
+                logger.error(f"ERROR: {e}")
+                logger.error("=" * 80)
+
+                return ReplyResult(
+                    message=error_msg,
+                    context_variables=context_variables,
+                )
+
+        # Use custom system message if provided, otherwise use default
+        system_message = custom_system_message if custom_system_message else TASK_MANAGER_SYSTEM_MESSAGE
+
+        super().__init__(
+            name=name,
+            system_message=system_message,
+            llm_config=llm_config,
+            functions=[ingest_documents, execute_query],  # Add initiate_tasks
+        )
+
+    def __del__(self) -> None:
+        """Clean up the ThreadPoolExecutor when the agent is destroyed."""
+        if hasattr(self, "executor") and self.executor is not None:
+            try:
+                self.executor.shutdown(wait=False)  # Don't block in destructor
+            except Exception as e:
+                logger.warning(f"Error shutting down executor: {e}")
+
+    def _create_rag_engines(self, collection_name: str | None = None) -> dict[str, Any]:
+        """Create RAG engines based on rag_config."""
+        engines = {}
+
+        for rag_type, config in self.rag_config.items():
+            if rag_type == "vector":
+                engines["vector"] = config.get(
+                    "engine",
+                    VectorChromaQueryEngine(
+                        collection_name=collection_name if collection_name else "default_collection",
+                    ),
+                )
+            elif rag_type == "graph":
+                engines["graph"] = self._create_neo4j_engine(config)
+
+        return engines
+
+    def _create_neo4j_engine(self, config: dict[str, Any]) -> Any:
+        """Create Neo4j graph query engine."""
+        try:
+            from autogen.agentchat.contrib.graph_rag.neo4j_graph_query_engine import Neo4jGraphQueryEngine
+
+            return Neo4jGraphQueryEngine(
+                host=config.get("host", "bolt://localhost"),
+                port=config.get("port", 7687),
+                database=config.get("database", "neo4j"),
+                username=config.get("username", "neo4j"),
+                password=config.get("password", "neo4j"),
+                llm=config.get("llm"),
+                embedding=config.get("embedding"),
+            )
+        except ImportError as e:
+            logger.warning(f"Neo4j dependencies not available: {e}. Skipping graph engine.")
+            return None
+
+    def _safe_context_update(self, context_variables: ContextVariables, key: str, value: Any) -> None:
+        """Thread-safe context variable update."""
+        with self._context_lock:
+            context_variables[key] = value
diff --git a/autogen/agents/experimental/document_agent/task_manager_utils.py b/autogen/agents/experimental/document_agent/task_manager_utils.py
new file mode 100644
index 00000000000..d10e3f27fd1
--- /dev/null
+++ b/autogen/agents/experimental/document_agent/task_manager_utils.py
@@ -0,0 +1,266 @@
+# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import tempfile
+from pathlib import Path
+from typing import Any
+
+import fitz  # type: ignore
+import requests
+import urllib3
+
+from autogen.agentchat.contrib.capabilities.text_compressors import LLMLingua
+from autogen.agentchat.contrib.capabilities.transforms import TextMessageCompressor
+from autogen.agents.experimental.document_agent.parser_utils import docling_parse_docs
+
+logger = logging.getLogger(__name__)
+
+
+def extract_text_from_pdf(doc_path: str) -> list[dict[str, str]]:
+    """Extract compressed text from a PDF file"""
+    if isinstance(doc_path, str) and urllib3.util.url.parse_url(doc_path).scheme:
+        # Download the PDF
+        response = requests.get(doc_path)
+        response.raise_for_status()  # Ensure the download was successful
+
+        text = ""
+        # Save the PDF to a temporary file
+        with tempfile.TemporaryDirectory() as temp_dir:
+            temp_pdf_path = Path(temp_dir) / "temp.pdf"
+            with open(temp_pdf_path, "wb") as f:
+                f.write(response.content)
+
+            # Open the PDF
+            with fitz.open(str(temp_pdf_path)) as doc:
+                # Read and extract text from each page
+                for page in doc:
+                    text += page.get_text()
+            llm_lingua = LLMLingua()
+            text_compressor = TextMessageCompressor(text_compressor=llm_lingua)
+            compressed_text = text_compressor.apply_transform([{"content": text}])
+
+        return compressed_text
+    else:
+        raise ValueError("doc_path must be a string or a URL")
+
+
+def aggregate_rag_results(query: str, results: dict[str, Any]) -> str:
+    """Aggregate results from multiple RAG engines."""
+    if not results:
+        return f"Query: {query}\nAnswer: No results found from any RAG engine."
+
+    # Simple aggregation
+    answer_parts = [f"Query: {query}"]
+
+    for engine_name, result in results.items():
+        answer_parts.append(f"\n{engine_name.upper()} Results:")
+        answer_parts.append(f"Answer: {result.get('answer', 'No answer available')}")
+
+        # Add citations if available
+        if "citations" in result and result["citations"]:
+            answer_parts.append("Citations:")
+            for i, citation in enumerate(result["citations"], 1):
+                answer_parts.append(f"  [{i}] {citation.get('file_path', 'Unknown')}")
+
+    return "\n".join(answer_parts)
+
+
+def compress_and_save_text(text: str, input_path: str, parsed_docs_path: Path) -> str:
+    """Compress text and save as markdown file."""
+    llm_lingua = LLMLingua()
+    text_compressor = TextMessageCompressor(text_compressor=llm_lingua)
+    compressed_text = text_compressor.apply_transform([{"content": text}])
+
+    if not compressed_text or not compressed_text[0].get("content"):
+        raise ValueError("Text compression failed or returned empty result")
+
+    # Create a markdown file with the extracted text
+    output_file = parsed_docs_path / f"{Path(input_path).stem}.md"
+    parsed_docs_path.mkdir(parents=True, exist_ok=True)
+
+    with open(output_file, "w", encoding="utf-8") as f:
+        f.write(compressed_text[0]["content"])
+
+    return str(output_file)
+
+
+def ingest_to_engines(
+    output_file: str, input_path: str, rag_config: dict[str, Any], rag_engines: dict[str, Any]
+) -> None:
+    """Ingest document to configured RAG engines."""
+    from autogen.agentchat.contrib.graph_rag.document import Document, DocumentType
+
+    # Determine document type
+    doc_type = DocumentType.TEXT
+    if input_path.lower().endswith(".pdf"):
+        doc_type = DocumentType.PDF
+    elif input_path.lower().endswith((".html", ".htm")):
+        doc_type = DocumentType.HTML
+    elif input_path.lower().endswith(".json"):
+        doc_type = DocumentType.JSON
+
+    # Create Document object for graph engines
+    graph_doc = Document(doctype=doc_type, path_or_url=output_file, data=None)
+
+    # Ingest to configured engines only
+    for rag_type in rag_config:
+        engine = rag_engines.get(rag_type)
+        if engine is None:
+            continue
+
+        try:
+            if rag_type == "vector":
+                engine.add_docs(new_doc_paths_or_urls=[output_file])
+            elif rag_type == "graph":
+                # For graph engines, we need to initialize if not done already
+                if not hasattr(engine, "_initialized"):
+                    engine.init_db([graph_doc])
+                    engine._initialized = True
+                else:
+                    # Add new records to existing graph
+                    if hasattr(engine, "add_records"):
+                        engine.add_records([graph_doc])
+        except Exception as e:
+            logger.warning(f"Failed to ingest to {rag_type} engine: {e}")
+
+
+def process_single_document(
+    input_file_path: str, parsed_docs_path: Path, rag_config: dict[str, Any], rag_engines: dict[str, Any]
+) -> tuple[str, bool, str]:
+    """Process a single document. Returns (path, success, error_msg)."""
+    try:
+        # Check if the document is a PDF
+        is_pdf = False
+        if isinstance(input_file_path, str) and (
+            input_file_path.lower().endswith(".pdf")
+            or (urllib3.util.url.parse_url(input_file_path).scheme and input_file_path.lower().endswith(".pdf"))
+        ):
+            # Check for PDF extension or URL ending with .pdf
+            is_pdf = True
+
+        if is_pdf:
+            # Handle PDF with PyMuPDF
+            logger.info("PDF found, using PyMuPDF for extraction")
+            if urllib3.util.url.parse_url(input_file_path).scheme:
+                # Download the PDF
+                response = requests.get(input_file_path)
+                response.raise_for_status()
+
+                text = ""
+                # Save the PDF to a temporary file and extract text
+                with tempfile.TemporaryDirectory() as temp_dir:
+                    temp_pdf_path = Path(temp_dir) / "temp.pdf"
+                    with open(temp_pdf_path, "wb") as f:
+                        f.write(response.content)
+
+                    # Open the PDF and extract text
+                    with fitz.open(temp_pdf_path) as doc:
+                        for page in doc:
+                            text += page.get_text()
+
+                    # Compress and save
+                    output_file = compress_and_save_text(text, input_file_path, parsed_docs_path)
+
+                    # Ingest to all active engines
+                    ingest_to_engines(output_file, input_file_path, rag_config, rag_engines)
+
+                    return (input_file_path, True, "")
+            else:
+                # Local PDF file
+                text = ""
+                with fitz.open(input_file_path) as doc:
+                    for page in doc:
+                        text += page.get_text()
+
+                # Compress and save
+                output_file = compress_and_save_text(text, input_file_path, parsed_docs_path)
+
+                # Ingest to all active engines
+                ingest_to_engines(output_file, input_file_path, rag_config, rag_engines)
+
+                return (input_file_path, True, "")
+        else:
+            # Handle non-PDF documents with docling
+            output_files = docling_parse_docs(
+                input_file_path=input_file_path,
+                output_dir_path=parsed_docs_path,
+                output_formats=["markdown"],
+            )
+
+            # Limit to one output markdown file for now.
+            if output_files:
+                parsed_output_file: Path = output_files[0]
+                if parsed_output_file.suffix == ".md":
+                    # Ingest to all active engines
+                    ingest_to_engines(str(parsed_output_file), input_file_path, rag_config, rag_engines)
+                    return (input_file_path, True, "")
+
+            return (input_file_path, False, "No valid markdown output generated")
+
+    except Exception as doc_error:
+        return (input_file_path, False, str(doc_error))
+
+
+def execute_single_query(query_text: str, rag_config: dict[str, Any], rag_engines: dict[str, Any]) -> tuple[str, str]:
+    """Execute a single query across configured RAG engines. Returns (query, result)."""
+    try:
+        results = {}
+
+        # Only query engines that are configured in rag_config
+        for rag_type in rag_config:
+            engine = rag_engines.get(rag_type)
+            if engine is None:
+                continue
+
+            try:
+                if rag_type == "vector":
+                    # Handle vector queries
+                    if (
+                        hasattr(engine, "enable_query_citations")
+                        and getattr(engine, "enable_query_citations", False)
+                        and hasattr(engine, "query_with_citations")
+                        and callable(getattr(engine, "query_with_citations", None))
+                    ):
+                        answer_with_citations = getattr(engine, "query_with_citations")(query_text)
+                        answer = answer_with_citations.answer
+                        txt_citations = [
+                            {
+                                "text_chunk": source.node.get_text(),
+                                "file_path": source.metadata.get("file_path", "Unknown"),
+                            }
+                            for source in answer_with_citations.citations
+                        ]
+                        results[rag_type] = {"answer": answer, "citations": txt_citations}
+                        logger.info(f"Vector Citations: {txt_citations}")
+                    else:
+                        answer = engine.query(query_text) if engine else "Vector engine not available"
+                        results[rag_type] = {"answer": answer}
+
+                elif rag_type == "graph":
+                    # Handle graph queries
+                    # Try to connect to existing graph if not already connected
+                    if not hasattr(engine, "index"):
+                        try:
+                            engine.connect_db()
+                            logger.info("Connected to existing Neo4j graph for querying")
+                        except Exception as connect_error:
+                            logger.warning(f"Failed to connect to Neo4j graph: {connect_error}")
+                            results[rag_type] = {"answer": f"Error connecting to graph: {connect_error}"}
+                            continue
+
+                    graph_result = engine.query(query_text)
+                    results[rag_type] = {"answer": graph_result.answer, "results": graph_result.results}
+
+            except Exception as engine_error:
+                logger.warning(f"Failed to query {rag_type} engine: {engine_error}")
+                results[rag_type] = {"answer": f"Error querying {rag_type}: {engine_error}"}
+
+        # Aggregate results
+        aggregated_answer = aggregate_rag_results(query_text, results)
+        return (query_text, aggregated_answer)
+
+    except Exception as query_error:
+        logger.warning(f"Failed to execute query '{query_text}': {query_error}")
+        return (query_text, f"Query: {query_text}\nAnswer: Error executing query: {query_error}")
diff --git a/pyproject.toml b/pyproject.toml
index 37aef3a3274..2fe093efc0c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -294,7 +294,7 @@ docs = [
     # ToDo: currently problematic and cannot be upgraded
     "mkdocs-git-revision-date-localized-plugin==1.4.7",
     "mike==2.1.3",
-    "typer==0.17.4",
+    "typer>=0.16.0,<0.17.0",
     "mkdocs-minify-plugin==0.8.0",
     "mkdocs-macros-plugin==1.3.9",  # includes with variables
     "mkdocs-glightbox==0.5.1",  # img zoom
@@ -307,6 +307,8 @@ docs = [
     "termcolor==3.1.0",
     "nbclient==0.10.2",
     "mcp>=1.11.0",
+    "pymupdf>=1.24.4",
+    "llmlingua>=0.2.2"
 ]
 
 types = [
diff --git a/test/agentchat/realtime_agent/realtime_test_utils.py b/test/agentchat/realtime_agent/realtime_test_utils.py
index f0989fa39f8..990d575a4db 100644
--- a/test/agentchat/realtime_agent/realtime_test_utils.py
+++ b/test/agentchat/realtime_agent/realtime_test_utils.py
@@ -13,8 +13,7 @@
 from autogen.import_utils import optional_import_block
 
 with optional_import_block() as result:
-    from openai import OpenAI
-    from openai._types import Omit
+    from openai import Omit, OpenAI
 
 __all__ = ["text_to_speech", "trace"]
 
diff --git a/test/agents/experimental/document_agent/test_docagent.py b/test/agents/experimental/document_agent/test_docagent.py
deleted file mode 100644
index 7849d74c2aa..00000000000
--- a/test/agents/experimental/document_agent/test_docagent.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from pathlib import Path
-
-import pytest
-
-from autogen.agents.experimental.document_agent.document_agent import (
-    DocAgent,
-    DocumentTask,
-    DocumentTriageAgent,
-)
-from autogen.import_utils import run_for_optional_imports, skip_on_missing_imports
-from test.credentials import Credentials
-
-
-@run_for_optional_imports(["openai"], "openai")
-def test_document_triage_agent_init(credentials_gpt_4o_mini: Credentials) -> None:
-    llm_config = credentials_gpt_4o_mini.llm_config
-    triage_agent = DocumentTriageAgent(llm_config)
-    assert triage_agent.llm_config["response_format"] == DocumentTask  # type: ignore [index]
-
-
-@pytest.mark.openai
-@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
-def test_document_agent_init(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
-    llm_config = credentials_gpt_4o_mini.llm_config
-    document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
-
-    assert hasattr(document_agent, "_task_manager_agent")
-    assert hasattr(document_agent, "_triage_agent")
-    assert hasattr(document_agent, "_data_ingestion_agent")
-    assert hasattr(document_agent, "_query_agent")
-    assert hasattr(document_agent, "_error_agent")
-    assert hasattr(document_agent, "_summary_agent")
diff --git a/test/agents/experimental/document_agent/test_document_agent.py b/test/agents/experimental/document_agent/test_document_agent.py
new file mode 100644
index 00000000000..cf9be539fd6
--- /dev/null
+++ b/test/agents/experimental/document_agent/test_document_agent.py
@@ -0,0 +1,336 @@
+# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from autogen.agents.experimental.document_agent.document_agent import (
+    DEFAULT_SYSTEM_MESSAGE,
+    DocAgent,
+    DocumentTask,
+    DocumentTriageAgent,
+)
+from autogen.agents.experimental.document_agent.document_utils import Ingest, Query, QueryType
+from autogen.import_utils import run_for_optional_imports, skip_on_missing_imports
+
+from ....conftest import Credentials
+
+
+class TestDocumentTask:
+    """Test DocumentTask model."""
+
+    def test_document_task_init(self) -> None:
+        """Test DocumentTask initialization."""
+        ingestions = [Ingest(path_or_url="test.pdf")]
+        queries = [Query(query_type=QueryType.RAG_QUERY, query="What is this about?")]
+
+        task = DocumentTask(ingestions=ingestions, queries=queries)
+
+        assert len(task.ingestions) == 1
+        assert task.ingestions[0].path_or_url == "test.pdf"
+        assert len(task.queries) == 1
+        assert task.queries[0].query == "What is this about?"
+        assert task.queries[0].query_type == QueryType.RAG_QUERY
+
+    def test_document_task_empty_lists(self) -> None:
+        """Test DocumentTask with empty lists."""
+        task = DocumentTask(ingestions=[], queries=[])
+
+        assert len(task.ingestions) == 0
+        assert len(task.queries) == 0
+
+    def test_document_task_serialization(self) -> None:
+        """Test DocumentTask serialization."""
+        ingestions = [Ingest(path_or_url="test.pdf")]
+        queries = [Query(query_type=QueryType.RAG_QUERY, query="What is this about?")]
+
+        task = DocumentTask(ingestions=ingestions, queries=queries)
+
+        # Test that it can be serialized to dict
+        task_dict = task.model_dump()
+        assert "ingestions" in task_dict
+        assert "queries" in task_dict
+
+        # Test that it can be reconstructed from dict
+        reconstructed = DocumentTask(**task_dict)
+        assert len(reconstructed.ingestions) == 1
+        assert reconstructed.ingestions[0].path_or_url == "test.pdf"
+
+
+class TestDocumentTriageAgent:
+    """Test DocumentTriageAgent class."""
+
+    @run_for_optional_imports(["openai"], "openai")
+    def test_document_triage_agent_init(self, credentials_gpt_4o_mini: Credentials) -> None:
+        """Test DocumentTriageAgent initialization."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        triage_agent = DocumentTriageAgent(llm_config)
+
+        assert triage_agent.name == "DocumentTriageAgent"
+        assert triage_agent.llm_config["response_format"] == DocumentTask  # type: ignore [index]
+        assert triage_agent.human_input_mode == "NEVER"
+        assert "document triage agent" in triage_agent.system_message.lower()
+
+    @run_for_optional_imports(["openai"], "openai")
+    def test_document_triage_agent_init_none_config(self) -> None:
+        """Test DocumentTriageAgent initialization with None config."""
+        with patch("autogen.llm_config.LLMConfig.get_current_llm_config") as mock_get_config:
+            mock_get_config.return_value = {"config_list": [{"model": "gpt-4o-mini", "api_key": "test"}]}
+            triage_agent = DocumentTriageAgent(None)
+
+            assert triage_agent.name == "DocumentTriageAgent"
+            assert triage_agent.llm_config["response_format"] == DocumentTask  # type: ignore [index]
+
+    @run_for_optional_imports(["openai"], "openai")
+    def test_document_triage_agent_system_message(self, credentials_gpt_4o_mini: Credentials) -> None:
+        """Test DocumentTriageAgent system message content."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        triage_agent = DocumentTriageAgent(llm_config)
+
+        system_message = triage_agent.system_message
+        assert "document triage agent" in system_message.lower()
+        assert "documenttask" in system_message.lower()
+        assert "ingestions" in system_message.lower()
+        assert "queries" in system_message.lower()
+
+
+class TestDocAgent:
+    """Test DocAgent class."""
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_document_agent_init(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test DocAgent initialization."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+        # Check that the correct agents are initialized (not the old ones)
+        assert hasattr(document_agent, "_task_manager_agent")
+        assert hasattr(document_agent, "_triage_agent")
+        assert hasattr(document_agent, "_summary_agent")
+
+        # Check that old agents are NOT present
+        assert not hasattr(document_agent, "_data_ingestion_agent")
+        assert not hasattr(document_agent, "_query_agent")
+        assert not hasattr(document_agent, "_error_agent")
+
+        # Check initialization values
+        assert document_agent.name == "DocAgent"
+        assert document_agent.human_input_mode == "NEVER"
+        assert document_agent.documents_ingested == []
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_document_agent_init_defaults(self, credentials_gpt_4o_mini: Credentials) -> None:
+        """Test DocAgent initialization with default values."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        document_agent = DocAgent(llm_config=llm_config)
+
+        assert document_agent.name == "DocAgent"
+        assert document_agent.system_message == DEFAULT_SYSTEM_MESSAGE
+        assert document_agent.human_input_mode == "NEVER"
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_document_agent_init_custom_name(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test DocAgent initialization with custom name."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        custom_name = "CustomDocAgent"
+        document_agent = DocAgent(name=custom_name, llm_config=llm_config, parsed_docs_path=tmp_path)
+
+        assert document_agent.name == custom_name
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_document_agent_init_custom_system_message(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path
+    ) -> None:
+        """Test DocAgent initialization with custom system message."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        custom_message = "Custom system message for testing."
+        document_agent = DocAgent(llm_config=llm_config, system_message=custom_message, parsed_docs_path=tmp_path)
+
+        assert document_agent.system_message == custom_message
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_document_agent_init_with_custom_query_engine(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path
+    ) -> None:
+        """Test DocAgent initialization with custom query engine."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        mock_query_engine = MagicMock()
+
+        document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+        assert document_agent._task_manager_agent.query_engine == mock_query_engine
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_get_document_input_message_valid(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test _get_document_input_message with valid messages."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        with patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            messages = [{"content": "Test message", "role": "user"}]
+            result = document_agent._get_document_input_message(messages)
+
+            assert result == "Test message"
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_get_document_input_message_none(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test _get_document_input_message with None messages."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        with patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            result = document_agent._get_document_input_message(None)
+
+            assert result == ""
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_get_document_input_message_empty_list(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test _get_document_input_message with empty messages list."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        with patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            result = document_agent._get_document_input_message([])
+
+            assert result == ""
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_get_document_input_message_invalid_format(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path
+    ) -> None:
+        """Test _get_document_input_message with invalid message format."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        with patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            with pytest.raises(NotImplementedError, match="Invalid messages format"):
+                document_agent._get_document_input_message([{"invalid": "format"}])
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    @patch("autogen.agents.experimental.document_agent.document_agent.initiate_group_chat")
+    def test_generate_inner_group_chat_reply_basic(
+        self, mock_initiate_group_chat: MagicMock, credentials_gpt_4o_mini: Credentials, tmp_path: Path
+    ) -> None:
+        """Test generate_inner_group_chat_reply basic functionality."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            # Mock the group chat result
+            mock_result = MagicMock()
+            mock_result.summary = "Test summary"
+            mock_initiate_group_chat.return_value = (mock_result, MagicMock(), MagicMock())
+
+            messages = [{"content": "Test message", "role": "user"}]
+            success, reply = document_agent.generate_inner_group_chat_reply(messages=messages)
+
+            assert success is True
+            assert reply == "Test summary"
+            mock_initiate_group_chat.assert_called_once()
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    @patch("autogen.agents.experimental.document_agent.document_agent.initiate_group_chat")
+    def test_generate_inner_group_chat_reply_with_triage_output(
+        self, mock_initiate_group_chat: MagicMock, credentials_gpt_4o_mini: Credentials, tmp_path: Path
+    ) -> None:
+        """Test generate_inner_group_chat_reply with triage agent output."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            # Mock the group chat result
+            mock_result = MagicMock()
+            mock_result.summary = "Test summary"
+            mock_initiate_group_chat.return_value = (mock_result, MagicMock(), MagicMock())
+
+            # Create a triage agent output message
+            triage_output = {
+                "ingestions": [{"path_or_url": "test.pdf"}],
+                "queries": [{"query_type": "RAG_QUERY", "query": "What is this about?"}],
+            }
+            messages = [{"name": "DocumentTriageAgent", "content": json.dumps(triage_output), "role": "assistant"}]
+
+            success, reply = document_agent.generate_inner_group_chat_reply(messages=messages)
+
+            assert success is True
+            assert reply == "Test summary"
+            mock_initiate_group_chat.assert_called_once()
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_generate_inner_group_chat_reply_invalid_json(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path
+    ) -> None:
+        """Test generate_inner_group_chat_reply with invalid JSON in triage output."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"),
+            patch(
+                "autogen.agents.experimental.document_agent.document_agent.initiate_group_chat"
+            ) as mock_initiate_group_chat,
+        ):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            # Mock the group chat result
+            mock_result = MagicMock()
+            mock_result.summary = "Test summary"
+            mock_initiate_group_chat.return_value = (mock_result, MagicMock(), MagicMock())
+            # Create a triage agent output message with invalid JSON
+            messages = [{"name": "DocumentTriageAgent", "content": "invalid json content", "role": "assistant"}]
+            success, reply = document_agent.generate_inner_group_chat_reply(messages=messages)
+            assert success is True
+            assert reply == "Test summary"
+            mock_initiate_group_chat.assert_called_once()
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_context_variables_initialization(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test that context variables are properly initialized."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.chroma_query_engine.VectorChromaQueryEngine"),
+            patch(
+                "autogen.agents.experimental.document_agent.document_agent.initiate_group_chat"
+            ) as mock_initiate_group_chat,
+        ):
+            document_agent = DocAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            # Mock the group chat result
+            mock_result = MagicMock()
+            mock_result.summary = "Test summary"
+            mock_initiate_group_chat.return_value = (mock_result, MagicMock(), MagicMock())
+            messages = [{"content": "Test message", "role": "user"}]
+            document_agent.generate_inner_group_chat_reply(messages=messages)
+            # Check that context variables were passed to initiate_group_chat
+            call_args = mock_initiate_group_chat.call_args
+            pattern = call_args[1]["pattern"]
+            context_variables = pattern.context_variables
+            assert "CompletedTaskCount" in context_variables
+            assert "DocumentsToIngest" in context_variables
+            assert "DocumentsIngested" in context_variables
+            assert "QueriesToRun" in context_variables
+            assert "QueryResults" in context_variables
+            assert context_variables["CompletedTaskCount"] == 0
+            assert context_variables["DocumentsToIngest"] == []
+            assert context_variables["DocumentsIngested"] == []
+            assert context_variables["QueriesToRun"] == []
+            assert context_variables["QueryResults"] == []
diff --git a/test/agents/experimental/document_agent/test_task_manager.py b/test/agents/experimental/document_agent/test_task_manager.py
new file mode 100644
index 00000000000..e9307219062
--- /dev/null
+++ b/test/agents/experimental/document_agent/test_task_manager.py
@@ -0,0 +1,578 @@
+# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import asyncio
+from pathlib import Path
+from typing import Any
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from autogen.agentchat.group.context_variables import ContextVariables
+from autogen.agents.experimental.document_agent.task_manager import TASK_MANAGER_SYSTEM_MESSAGE, TaskManagerAgent
+from autogen.import_utils import skip_on_missing_imports
+
+from ....conftest import Credentials
+
+
+class TestTaskManagerAgent:
+    """Test TaskManagerAgent class focusing on helper methods and basic functionality."""
+
+    @pytest.fixture
+    def mock_query_engine(self) -> MagicMock:
+        """Create a mock query engine for testing."""
+        mock_engine = MagicMock()
+        mock_engine.add_docs = MagicMock()
+        mock_engine.query = MagicMock()
+        mock_engine.enable_query_citations = False
+        mock_engine.query_with_citations = MagicMock()
+        return mock_engine
+
+    @pytest.fixture
+    def mock_executor(self) -> MagicMock:
+        """Create a mock ThreadPoolExecutor for testing."""
+        mock_executor = MagicMock()
+        mock_executor.shutdown = MagicMock()
+        return mock_executor
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_task_manager_agent_init_basic(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test TaskManagerAgent basic initialization."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine") as mock_ve,
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor") as mock_tpe,
+        ):
+            mock_ve.return_value = MagicMock()
+            mock_tpe.return_value = MagicMock()
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, max_workers=2)
+
+            assert agent.name == "TaskManagerAgent"
+            assert agent.parsed_docs_path == tmp_path
+            assert hasattr(agent, "executor")
+            assert hasattr(agent, "query_engine")
+            mock_tpe.assert_called_once_with(max_workers=2)
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_task_manager_agent_init_defaults(self, credentials_gpt_4o_mini: Credentials) -> None:
+        """Test TaskManagerAgent initialization with default values."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine") as mock_ve,
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor") as mock_tpe,
+        ):
+            mock_ve.return_value = MagicMock()
+            mock_tpe.return_value = MagicMock()
+
+            agent = TaskManagerAgent(llm_config=llm_config)
+
+            assert agent.name == "TaskManagerAgent"
+            assert agent.parsed_docs_path == Path("./parsed_docs")
+            assert hasattr(agent, "executor")
+            assert hasattr(agent, "query_engine")
+            mock_tpe.assert_called_once_with(max_workers=None)
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_task_manager_agent_init_custom_name(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test TaskManagerAgent initialization with custom name."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        custom_name = "CustomTaskManager"
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            agent = TaskManagerAgent(name=custom_name, llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            assert agent.name == custom_name
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_task_manager_agent_init_with_custom_query_engine(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test TaskManagerAgent initialization with custom query engine."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"):
+            agent = TaskManagerAgent(llm_config=llm_config, query_engine=mock_query_engine, parsed_docs_path=tmp_path)
+
+            assert agent.query_engine == mock_query_engine
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_task_manager_agent_init_with_collection_name(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path
+    ) -> None:
+        """Test TaskManagerAgent initialization with collection name."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+        collection_name = "test_collection"
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine") as mock_ve,
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            mock_ve.return_value = MagicMock()
+
+            TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, collection_name=collection_name)
+
+            mock_ve.assert_called_once_with(collection_name=collection_name)
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_task_manager_agent_cleanup(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_executor: MagicMock
+    ) -> None:
+        """Test TaskManagerAgent cleanup on destruction."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+            patch(
+                "autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor", return_value=mock_executor
+            ),
+        ):
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            # Call __del__ explicitly
+            agent.__del__()
+
+            # Verify shutdown was called
+            mock_executor.shutdown.assert_called_once_with(wait=False)
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_task_manager_agent_cleanup_no_executor(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test TaskManagerAgent cleanup when executor doesn't exist."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            # Remove executor attribute
+            delattr(agent, "executor")
+            # Should not raise an exception
+            agent.__del__()
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_system_message_content(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test that the system message contains expected content."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            system_message = agent.system_message
+            assert "task manager agent" in system_message.lower()
+            assert "ingest_documents" in system_message.lower()
+            assert "execute_query" in system_message.lower()
+            assert "tools" in system_message.lower()
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_system_message_constant(self) -> None:
+        """Test that the system message constant is properly defined."""
+        assert "task manager agent" in TASK_MANAGER_SYSTEM_MESSAGE.lower()
+        assert "ingest_documents" in TASK_MANAGER_SYSTEM_MESSAGE.lower()
+        assert "execute_query" in TASK_MANAGER_SYSTEM_MESSAGE.lower()
+        assert "tools" in TASK_MANAGER_SYSTEM_MESSAGE.lower()
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_inheritance_from_conversable_agent(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test that TaskManagerAgent properly inherits from ConversableAgent."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+
+            # Check that it has ConversableAgent attributes
+            assert hasattr(agent, "name")
+            assert hasattr(agent, "system_message")
+            assert hasattr(agent, "llm_config")
+            assert hasattr(agent, "function_map")
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_process_single_document_success(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test _process_single_document helper method with successful processing."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager_utils.docling_parse_docs") as mock_parse,
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            # Mock successful document parsing
+            mock_output_file = tmp_path / "test.md"
+            mock_output_file.touch()
+            mock_parse.return_value = [mock_output_file]
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+            # Test basic agent functionality
+            assert agent.query_engine == mock_query_engine
+            assert agent.parsed_docs_path == tmp_path
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_process_single_document_parsing_failure(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test _process_single_document helper method with parsing failure."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager_utils.docling_parse_docs") as mock_parse,
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            # Mock document parsing failure
+            mock_parse.side_effect = Exception("Parsing failed")
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+            # Test basic agent functionality
+            assert agent.query_engine == mock_query_engine
+            assert agent.parsed_docs_path == tmp_path
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_process_single_document_no_output_files(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test _process_single_document helper method with no output files."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager_utils.docling_parse_docs") as mock_parse,
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            # Mock no output files
+            mock_parse.return_value = []
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+            # Test basic agent functionality
+            assert agent.query_engine == mock_query_engine
+            assert agent.parsed_docs_path == tmp_path
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_process_single_document_non_markdown_output(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test _process_single_document helper method with non-markdown output files."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with (
+            patch("autogen.agents.experimental.document_agent.task_manager_utils.docling_parse_docs") as mock_parse,
+            patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        ):
+            # Mock non-markdown output file
+            mock_output_file = tmp_path / "test.txt"
+            mock_output_file.touch()
+            mock_parse.return_value = [mock_output_file]
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+            # Test basic agent functionality
+            assert agent.query_engine == mock_query_engine
+            assert agent.parsed_docs_path == tmp_path
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_execute_single_query_success(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test _execute_single_query helper method with successful execution."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"):
+            # Mock successful query execution
+            mock_query_engine.query.return_value = "Test answer"
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+            # Test basic agent functionality
+            assert agent.query_engine == mock_query_engine
+            assert agent.parsed_docs_path == tmp_path
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_execute_single_query_with_citations(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test _execute_single_query helper method with citations support."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"):
+            # Mock citations support
+            mock_citations = MagicMock()
+            mock_citations.answer = "Test answer with citations"
+            mock_citations.citations = [
+                MagicMock(
+                    node=MagicMock(get_text=MagicMock(return_value="Citation text")), metadata={"file_path": "test.pdf"}
+                )
+            ]
+
+            mock_query_engine.enable_query_citations = True
+            mock_query_engine.query_with_citations.return_value = mock_citations
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+            # Test basic agent functionality
+            assert agent.query_engine == mock_query_engine
+            assert agent.parsed_docs_path == tmp_path
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_execute_single_query_failure(
+        self, credentials_gpt_4o_mini: Credentials, tmp_path: Path, mock_query_engine: MagicMock
+    ) -> None:
+        """Test _execute_single_query helper method with query failure."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"):
+            # Mock query execution failure
+            mock_query_engine.query.side_effect = Exception("Query failed")
+
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, query_engine=mock_query_engine)
+
+            # Test basic agent functionality
+            assert agent.query_engine == mock_query_engine
+            assert agent.parsed_docs_path == tmp_path
+
+    @pytest.mark.openai
+    @skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+    def test_execute_single_query_no_query_engine(self, credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+        """Test _execute_single_query helper method when query engine is None."""
+        llm_config = credentials_gpt_4o_mini.llm_config
+
+        with patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"):
+            agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+            # Fix the assignment error by properly typing the query_engine attribute
+            agent.query_engine = None  # type: ignore[assignment]
+
+            # Test basic agent functionality
+            assert agent.query_engine is None
+            assert agent.parsed_docs_path == tmp_path
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_task_manager_agent_init_with_rag_config(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test TaskManagerAgent initialization with rag_config."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+    rag_config: dict[str, Any] = {"vector": {}, "graph": {"host": "bolt://localhost"}}
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine") as mock_ve,
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+    ):
+        mock_ve.return_value = MagicMock()
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, rag_config=rag_config)
+        assert agent.rag_config == rag_config
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_task_manager_agent_init_with_custom_system_message(
+    credentials_gpt_4o_mini: Credentials, tmp_path: Path
+) -> None:
+    """Test TaskManagerAgent initialization with custom system message."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+    custom_message = "Custom system message"
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, custom_system_message=custom_message)
+        assert agent.system_message == custom_message
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_create_rag_engines_with_graph_config(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test _create_rag_engines with graph configuration."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+    rag_config: dict[str, Any] = {"graph": {"host": "bolt://localhost", "port": 7687}}
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        patch("autogen.agentchat.contrib.graph_rag.neo4j_graph_query_engine.Neo4jGraphQueryEngine") as mock_neo4j,
+    ):
+        mock_neo4j.return_value = MagicMock()
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, rag_config=rag_config)
+        assert "graph" in agent.rag_engines
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_create_neo4j_engine_import_error(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test _create_neo4j_engine with ImportError."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+    rag_config: dict[str, Any] = {"graph": {}}
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+        patch(
+            "autogen.agentchat.contrib.graph_rag.neo4j_graph_query_engine.Neo4jGraphQueryEngine",
+            side_effect=ImportError("No module"),
+        ),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path, rag_config=rag_config)
+        assert agent.rag_engines.get("graph") is None
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_safe_context_update(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test _safe_context_update method."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+        context_vars = ContextVariables()
+        agent._safe_context_update(context_vars, "test_key", "test_value")
+        assert context_vars["test_key"] == "test_value"
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_ingest_documents_empty_list(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test ingest_documents with empty document list."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+        context_vars = ContextVariables()
+
+        # Access the ingest_documents function from the agent's tools
+        ingest_tool = None
+        for tool in agent.tools:
+            if tool.name == "ingest_documents":
+                ingest_tool = tool
+                break
+
+        assert ingest_tool is not None, "ingest_documents tool not found"
+        result = asyncio.run(ingest_tool.func([], context_vars))
+        assert "No documents provided" in str(result.message)
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_ingest_documents_invalid_paths(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test ingest_documents with invalid document paths."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+        context_vars = ContextVariables()
+
+        # Access the ingest_documents function from the agent's tools
+        ingest_tool = None
+        for tool in agent.tools:
+            if tool.name == "ingest_documents":
+                ingest_tool = tool
+                break
+
+        assert ingest_tool is not None, "ingest_documents tool not found"
+        result = asyncio.run(ingest_tool.func(["", "   "], context_vars))
+        assert "No valid documents found" in str(result.message)
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_execute_query_empty_list(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test execute_query with empty query list."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+        context_vars = ContextVariables()
+
+        # Access the execute_query function from the agent's tools
+        query_tool = None
+        for tool in agent.tools:
+            if tool.name == "execute_query":
+                query_tool = tool
+                break
+
+        assert query_tool is not None, "execute_query tool not found"
+        result = asyncio.run(query_tool.func([], context_vars))
+        assert result == "No queries to run"
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_execute_query_invalid_queries(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test execute_query with invalid queries."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor"),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+        context_vars = ContextVariables()
+
+        # Access the execute_query function from the agent's tools
+        query_tool = None
+        for tool in agent.tools:
+            if tool.name == "execute_query":
+                query_tool = tool
+                break
+
+        assert query_tool is not None, "execute_query tool not found"
+        result = asyncio.run(query_tool.func(["", "   "], context_vars))
+        assert result == "No valid queries provided"
+
+
+@pytest.mark.openai
+@skip_on_missing_imports(["selenium", "webdriver_manager"], "rag")
+def test_del_with_exception(credentials_gpt_4o_mini: Credentials, tmp_path: Path) -> None:
+    """Test __del__ method with exception during shutdown."""
+    llm_config = credentials_gpt_4o_mini.llm_config
+    mock_executor = MagicMock()
+    mock_executor.shutdown.side_effect = Exception("Shutdown failed")
+
+    with (
+        patch("autogen.agents.experimental.document_agent.task_manager.VectorChromaQueryEngine"),
+        patch("autogen.agents.experimental.document_agent.task_manager.ThreadPoolExecutor", return_value=mock_executor),
+    ):
+        agent = TaskManagerAgent(llm_config=llm_config, parsed_docs_path=tmp_path)
+        agent.__del__()
+        mock_executor.shutdown.assert_called_once_with(wait=False)
diff --git a/test/agents/experimental/document_agent/test_task_manager_utils.py b/test/agents/experimental/document_agent/test_task_manager_utils.py
new file mode 100644
index 00000000000..09e9b5ff7ad
--- /dev/null
+++ b/test/agents/experimental/document_agent/test_task_manager_utils.py
@@ -0,0 +1,567 @@
+# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from pathlib import Path
+from typing import Any
+from unittest.mock import Mock, mock_open, patch
+
+import pytest
+import requests
+
+from autogen.agents.experimental.document_agent.task_manager_utils import (
+    aggregate_rag_results,
+    execute_single_query,
+    extract_text_from_pdf,
+    ingest_to_engines,
+    process_single_document,
+)
+
+
+class TestExtractTextFromPDF:
+    """Test extract_text_from_pdf function."""
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.fitz")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.requests.get")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.tempfile.TemporaryDirectory")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.LLMLingua")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.TextMessageCompressor")
+    def test_extract_text_from_pdf_url_success(
+        self, mock_compressor_class: Any, mock_llm_lingua_class: Any, mock_temp_dir: Any, mock_get: Any, mock_fitz: Any
+    ) -> None:
+        """Test successful PDF text extraction from URL."""
+        # Setup mocks
+        mock_response = Mock()
+        mock_response.content = b"fake pdf content"
+        mock_get.return_value = mock_response
+
+        # Mock the temporary directory context manager properly
+        mock_temp_dir_instance = Mock()
+        mock_temp_dir_instance.__enter__ = Mock(return_value="/tmp/test")
+        mock_temp_dir_instance.__exit__ = Mock(return_value=None)
+        mock_temp_dir.return_value = mock_temp_dir_instance
+
+        # Create a proper mock document that supports iteration
+        mock_doc = Mock()
+        mock_page1 = Mock()
+        mock_page1.get_text.return_value = "Page 1 content"
+        mock_page2 = Mock()
+        mock_page2.get_text.return_value = "Page 2 content"
+
+        # Fix: Use side_effect instead of return_value for iteration
+        mock_doc.__iter__ = Mock(return_value=iter([mock_page1, mock_page2]))
+        mock_fitz.open.return_value.__enter__.return_value = mock_doc
+
+        mock_compressor = Mock()
+        mock_compressor.apply_transform.return_value = [{"content": "compressed text"}]
+        mock_compressor_class.return_value = mock_compressor
+
+        mock_llm_lingua = Mock()
+        mock_llm_lingua_class.return_value = mock_llm_lingua
+
+        # Mock urllib3 to return a URL with scheme
+        with (
+            patch(
+                "autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url"
+            ) as mock_parse_url,
+            patch("builtins.open", mock_open()) as mock_file,
+        ):
+            mock_parsed_url = Mock()
+            mock_parsed_url.scheme = "https"
+            mock_parse_url.return_value = mock_parsed_url
+
+            # Execute
+            result = extract_text_from_pdf("https://example.com/test.pdf")
+
+            # Assertions
+            assert result == [{"content": "compressed text"}]
+            mock_get.assert_called_once_with("https://example.com/test.pdf")
+            mock_fitz.open.assert_called_once()
+            mock_compressor.apply_transform.assert_called_once_with([{"content": "Page 1 contentPage 2 content"}])
+            # Verify file was opened for writing - use Path object instead of string
+            mock_file.assert_called_with(Path("/tmp/test/temp.pdf"), "wb")
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url")
+    def test_extract_text_from_pdf_non_url_raises_error(self, mock_parse_url: Any) -> None:
+        """Test that non-URL input raises ValueError."""
+        mock_parsed_url = Mock()
+        mock_parsed_url.scheme = None
+        mock_parse_url.return_value = mock_parsed_url
+
+        with pytest.raises(ValueError, match="doc_path must be a string or a URL"):
+            extract_text_from_pdf("not_a_url")
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.requests.get")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url")
+    def test_extract_text_from_pdf_request_error(self, mock_parse_url: Any, mock_get: Any) -> None:
+        """Test handling of request errors."""
+        mock_parsed_url = Mock()
+        mock_parsed_url.scheme = "https"
+        mock_parse_url.return_value = mock_parsed_url
+
+        mock_get.side_effect = requests.RequestException("Network error")
+
+        with pytest.raises(requests.RequestException):
+            extract_text_from_pdf("https://example.com/test.pdf")
+
+
+class TestAggregateRagResults:
+    """Test aggregate_rag_results function."""
+
+    def test_aggregate_rag_results_empty(self) -> None:
+        """Test aggregation with empty results."""
+        result = aggregate_rag_results("test query", {})
+        expected = "Query: test query\nAnswer: No results found from any RAG engine."
+        assert result == expected
+
+    def test_aggregate_rag_results_single_engine(self) -> None:
+        """Test aggregation with single engine result."""
+        results: dict[str, Any] = {"vector": {"answer": "Vector answer", "citations": []}}
+        result = aggregate_rag_results("test query", results)
+        expected = "Query: test query\n\nVECTOR Results:\nAnswer: Vector answer"
+        assert result == expected
+
+    def test_aggregate_rag_results_with_citations(self) -> None:
+        """Test aggregation with citations."""
+        results: dict[str, Any] = {
+            "vector": {"answer": "Vector answer", "citations": [{"file_path": "doc1.pdf"}, {"file_path": "doc2.pdf"}]}
+        }
+        result = aggregate_rag_results("test query", results)
+        assert "Citations:" in result
+        assert "[1] doc1.pdf" in result
+        assert "[2] doc2.pdf" in result
+
+    def test_aggregate_rag_results_multiple_engines(self) -> None:
+        """Test aggregation with multiple engines."""
+        results: dict[str, Any] = {"vector": {"answer": "Vector answer"}, "graph": {"answer": "Graph answer"}}
+        result = aggregate_rag_results("test query", results)
+        assert "VECTOR Results:" in result
+        assert "GRAPH Results:" in result
+        assert "Vector answer" in result
+        assert "Graph answer" in result
+
+    def test_aggregate_rag_results_missing_answer(self) -> None:
+        """Test aggregation when answer is missing."""
+        results: dict[str, Any] = {"vector": {}}
+        result = aggregate_rag_results("test query", results)
+        assert "No answer available" in result
+
+
+class TestIngestToEngines:
+    """Test ingest_to_engines function."""
+
+    @patch("autogen.agentchat.contrib.graph_rag.document.Document")
+    @patch("autogen.agentchat.contrib.graph_rag.document.DocumentType")
+    def test_ingest_to_engines_vector_only(self, mock_doc_type: Any, mock_document: Any) -> None:
+        """Test ingestion to vector engine only."""
+        # Setup mocks
+        mock_doc_type.TEXT = "TEXT"
+        mock_doc_type.PDF = "PDF"
+        mock_doc = Mock()
+        mock_document.return_value = mock_doc
+
+        # Mock engines
+        vector_engine = Mock()
+        rag_engines = {"vector": vector_engine}
+        rag_config: list[str] = ["vector"]
+
+        # Execute
+        ingest_to_engines("/path/to/file.md", "/path/to/input.pdf", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        vector_engine.add_docs.assert_called_once_with(new_doc_paths_or_urls=["/path/to/file.md"])
+        mock_document.assert_called_once_with(doctype="PDF", path_or_url="/path/to/file.md", data=None)
+
+    @patch("autogen.agentchat.contrib.graph_rag.document.Document")
+    @patch("autogen.agentchat.contrib.graph_rag.document.DocumentType")
+    def test_ingest_to_engines_graph_uninitialized(self, mock_doc_type: Any, mock_document: Any) -> None:
+        """Test ingestion to uninitialized graph engine."""
+        # Setup mocks
+        mock_doc_type.TEXT = "TEXT"
+        mock_doc = Mock()
+        mock_document.return_value = mock_doc
+
+        # Mock engines
+        graph_engine = Mock()
+        # Fix: Remove _initialized attribute to simulate uninitialized state
+        del graph_engine._initialized
+        rag_engines = {"graph": graph_engine}
+        rag_config: list[str] = ["graph"]
+
+        # Execute
+        ingest_to_engines("/path/to/file.md", "/path/to/input.txt", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        graph_engine.init_db.assert_called_once_with([mock_doc])
+        assert graph_engine._initialized is True
+
+    @patch("autogen.agentchat.contrib.graph_rag.document.Document")
+    @patch("autogen.agentchat.contrib.graph_rag.document.DocumentType")
+    def test_ingest_to_engines_graph_initialized(self, mock_doc_type: Any, mock_document: Any) -> None:
+        """Test ingestion to initialized graph engine."""
+        # Setup mocks
+        mock_doc_type.TEXT = "TEXT"
+        mock_doc = Mock()
+        mock_document.return_value = mock_doc
+
+        # Mock engines
+        graph_engine = Mock()
+        graph_engine._initialized = True  # Already initialized
+        rag_engines = {"graph": graph_engine}
+        rag_config: list[str] = ["graph"]
+
+        # Execute
+        ingest_to_engines("/path/to/file.md", "/path/to/input.txt", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        graph_engine.init_db.assert_not_called()
+        graph_engine.add_records.assert_called_once_with([mock_doc])
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.logger")
+    @patch("autogen.agentchat.contrib.graph_rag.document.Document")
+    @patch("autogen.agentchat.contrib.graph_rag.document.DocumentType")
+    def test_ingest_to_engines_error_handling(self, mock_doc_type: Any, mock_document: Any, mock_logger: Any) -> None:
+        """Test error handling during ingestion."""
+        # Setup mocks
+        mock_doc_type.TEXT = "TEXT"
+        mock_doc = Mock()
+        mock_document.return_value = mock_doc
+
+        # Mock engines with error
+        vector_engine = Mock()
+        vector_engine.add_docs.side_effect = Exception("Ingestion failed")
+        rag_engines = {"vector": vector_engine}
+        rag_config: list[str] = ["vector"]
+
+        # Execute
+        ingest_to_engines("/path/to/file.md", "/path/to/input.txt", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        mock_logger.warning.assert_called_once_with("Failed to ingest to vector engine: Ingestion failed")
+
+    def test_ingest_to_engines_missing_engine(self) -> None:
+        """Test handling of missing engine."""
+        rag_engines: dict[str, Any] = {}
+        rag_config: list[str] = ["vector"]
+
+        # Should not raise any error
+        ingest_to_engines("/path/to/file.md", "/path/to/input.txt", rag_config, rag_engines)  # type: ignore
+
+
+class TestProcessSingleDocument:
+    """Test process_single_document function."""
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.requests.get")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.tempfile.TemporaryDirectory")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.fitz")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.compress_and_save_text")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.ingest_to_engines")
+    @patch("builtins.open", new_callable=mock_open)
+    def test_process_pdf_url_success(
+        self,
+        mock_file_open: Any,
+        mock_ingest: Any,
+        mock_compress: Any,
+        mock_fitz: Any,
+        mock_temp_dir: Any,
+        mock_get: Any,
+        mock_parse_url: Any,
+    ) -> None:
+        """Test successful processing of PDF URL."""
+        # Setup mocks
+        mock_parsed_url = Mock()
+        mock_parsed_url.scheme = "https"
+        mock_parse_url.return_value = mock_parsed_url
+
+        mock_response = Mock()
+        mock_response.content = b"fake pdf content"
+        mock_get.return_value = mock_response
+
+        # Fix: Mock the actual temp directory path properly
+        mock_temp_dir.return_value.__enter__.return_value = "/tmp/test"
+
+        # Create a proper mock document that supports iteration
+        mock_doc = Mock()
+        mock_page = Mock()
+        mock_page.get_text.return_value = "PDF content"
+
+        # Fix: Use side_effect instead of return_value for iteration
+        mock_doc.__iter__ = Mock(return_value=iter([mock_page]))
+        mock_fitz.open.return_value.__enter__.return_value = mock_doc
+
+        mock_compress.return_value = "/path/to/output.md"
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": Mock()}
+
+        # Execute
+        result = process_single_document("https://example.com/test.pdf", Path("/output"), rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result == ("https://example.com/test.pdf", True, "")
+        mock_compress.assert_called_once()
+        mock_ingest.assert_called_once()
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.fitz")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.compress_and_save_text")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.ingest_to_engines")
+    def test_process_local_pdf_success(
+        self, mock_ingest: Any, mock_compress: Any, mock_fitz: Any, mock_parse_url: Any
+    ) -> None:
+        """Test successful processing of local PDF file."""
+        # Setup mocks
+        mock_parsed_url = Mock()
+        mock_parsed_url.scheme = None
+        mock_parse_url.return_value = mock_parsed_url
+
+        # Create a proper mock document that supports iteration
+        mock_doc = Mock()
+        mock_page = Mock()
+        mock_page.get_text.return_value = "PDF content"
+
+        # Fix: Use side_effect instead of return_value for iteration
+        mock_doc.__iter__ = Mock(return_value=iter([mock_page]))
+        mock_fitz.open.return_value.__enter__.return_value = mock_doc
+
+        mock_compress.return_value = "/path/to/output.md"
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": Mock()}
+
+        # Execute
+        result = process_single_document("/path/to/test.pdf", Path("/output"), rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result == ("/path/to/test.pdf", True, "")
+        mock_compress.assert_called_once()
+        mock_ingest.assert_called_once()
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.docling_parse_docs")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.ingest_to_engines")
+    def test_process_non_pdf_success(self, mock_ingest: Any, mock_docling: Any, mock_parse_url: Any) -> None:
+        """Test successful processing of non-PDF document."""
+        # Setup mocks
+        mock_parsed_url = Mock()
+        mock_parsed_url.scheme = None
+        mock_parse_url.return_value = mock_parsed_url
+
+        mock_output_file = Path("/output/test.md")
+        mock_docling.return_value = [mock_output_file]
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": Mock()}
+
+        # Execute
+        result = process_single_document("/path/to/test.txt", Path("/output"), rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result == ("/path/to/test.txt", True, "")
+        mock_docling.assert_called_once_with(
+            input_file_path="/path/to/test.txt", output_dir_path=Path("/output"), output_formats=["markdown"]
+        )
+        mock_ingest.assert_called_once()
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url")
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.docling_parse_docs")
+    def test_process_non_pdf_no_output(self, mock_docling: Any, mock_parse_url: Any) -> None:
+        """Test processing non-PDF document with no output."""
+        # Setup mocks
+        mock_parsed_url = Mock()
+        mock_parsed_url.scheme = None
+        mock_parse_url.return_value = mock_parsed_url
+
+        mock_docling.return_value = []
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": Mock()}
+
+        # Execute
+        result = process_single_document("/path/to/test.txt", Path("/output"), rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result == ("/path/to/test.txt", False, "No valid markdown output generated")
+
+    @patch("autogen.agents.experimental.document_agent.task_manager_utils.urllib3.util.url.parse_url")
+    def test_process_document_exception(self, mock_parse_url: Any) -> None:
+        """Test handling of exceptions during processing."""
+        # Setup mocks
+        mock_parse_url.side_effect = Exception("Parse error")
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": Mock()}
+
+        # Execute
+        result = process_single_document("/path/to/test.txt", Path("/output"), rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result == ("/path/to/test.txt", False, "Parse error")
+
+
+class TestExecuteSingleQuery:
+    """Test execute_single_query function."""
+
+    def test_execute_query_vector_with_citations(self) -> None:
+        """Test vector query with citations."""
+        # Mock engine with citations
+        mock_engine = Mock()
+        mock_engine.enable_query_citations = True
+        mock_answer_with_citations = Mock()
+        mock_answer_with_citations.answer = "Test answer"
+
+        mock_source = Mock()
+        mock_source.node.get_text.return_value = "Source text"
+        mock_source.metadata.get.return_value = "source.pdf"
+        mock_answer_with_citations.citations = [mock_source]
+
+        mock_engine.query_with_citations.return_value = mock_answer_with_citations
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": mock_engine}
+
+        # Execute
+        result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result[0] == "test query"
+        assert "Test answer" in result[1]
+        assert "VECTOR Results:" in result[1]
+        mock_engine.query_with_citations.assert_called_once_with("test query")
+
+    def test_execute_query_vector_without_citations(self) -> None:
+        """Test vector query without citations."""
+        # Mock engine without citations
+        mock_engine = Mock()
+        # Fix: Ensure the engine doesn't have citation capabilities
+        mock_engine.enable_query_citations = False
+        mock_engine.query.return_value = "Test answer"
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": mock_engine}
+
+        # Execute
+        result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result[0] == "test query"
+        assert "Test answer" in result[1]
+        mock_engine.query.assert_called_once_with("test query")
+
+    def test_execute_query_graph_success(self) -> None:
+        """Test graph query success."""
+        # Mock graph engine
+        mock_engine = Mock()
+        mock_engine.index = "some_index"  # Has index, so already connected
+
+        mock_graph_result = Mock()
+        mock_graph_result.answer = "Graph answer"
+        mock_graph_result.results = {"nodes": []}
+        mock_engine.query.return_value = mock_graph_result
+
+        rag_config: list[str] = ["graph"]
+        rag_engines = {"graph": mock_engine}
+
+        # Execute
+        result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result[0] == "test query"
+        assert "Graph answer" in result[1]
+        assert "GRAPH Results:" in result[1]
+        mock_engine.query.assert_called_once_with("test query")
+
+    def test_execute_query_graph_connect_error(self) -> None:
+        """Test graph query with connection error."""
+        # Mock graph engine without index
+        mock_engine = Mock()
+        # Fix: Remove index attribute to simulate unconnected state
+        del mock_engine.index
+        mock_engine.connect_db.side_effect = Exception("Connection failed")
+
+        rag_config: list[str] = ["graph"]
+        rag_engines = {"graph": mock_engine}
+
+        # Execute
+        result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result[0] == "test query"
+        assert "Error connecting to graph: Connection failed" in result[1]
+
+    def test_execute_query_engine_error(self) -> None:
+        """Test query with engine error."""
+        # Mock engine that raises error
+        mock_engine = Mock()
+        # Fix: Ensure the engine doesn't have citation capabilities
+        mock_engine.enable_query_citations = False
+        mock_engine.query.side_effect = Exception("Query failed")
+
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": mock_engine}
+
+        # Execute
+        result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result[0] == "test query"
+        assert "Error querying vector: Query failed" in result[1]
+
+    def test_execute_query_missing_engine(self) -> None:
+        """Test query with missing engine."""
+        rag_config: list[str] = ["vector"]
+        rag_engines: dict[str, Any] = {}  # No engines
+
+        # Execute
+        result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result[0] == "test query"
+        assert "No results found from any RAG engine" in result[1]
+
+    def test_execute_query_general_exception(self) -> None:
+        """Test query with general exception."""
+        rag_config: list[str] = ["vector"]
+        rag_engines = {"vector": Mock()}
+
+        with patch(
+            "autogen.agents.experimental.document_agent.task_manager_utils.aggregate_rag_results"
+        ) as mock_aggregate:
+            mock_aggregate.side_effect = Exception("Aggregation failed")
+
+            # Execute
+            result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+            # Assertions
+            assert result[0] == "test query"
+            assert "Error executing query: Aggregation failed" in result[1]
+
+    def test_execute_query_multiple_engines(self) -> None:
+        """Test query with multiple engines."""
+        # Mock vector engine
+        mock_vector_engine = Mock()
+        # Fix: Ensure the engine doesn't have citation capabilities
+        mock_vector_engine.enable_query_citations = False
+        mock_vector_engine.query.return_value = "Vector answer"
+
+        # Mock graph engine
+        mock_graph_engine = Mock()
+        mock_graph_engine.index = "some_index"
+        mock_graph_result = Mock()
+        mock_graph_result.answer = "Graph answer"
+        mock_graph_result.results = {"nodes": []}
+        mock_graph_engine.query.return_value = mock_graph_result
+
+        rag_config: list[str] = ["vector", "graph"]
+        rag_engines = {"vector": mock_vector_engine, "graph": mock_graph_engine}
+
+        # Execute
+        result = execute_single_query("test query", rag_config, rag_engines)  # type: ignore
+
+        # Assertions
+        assert result[0] == "test query"
+        assert "Vector answer" in result[1]
+        assert "Graph answer" in result[1]
+        assert "VECTOR Results:" in result[1]
+        assert "GRAPH Results:" in result[1]
diff --git a/test/conftest.py b/test/conftest.py
index 25566dc29dc..c8e41f87fb3 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -14,6 +14,8 @@
 from test.const import KEY_LOC, MOCK_AZURE_API_KEY, MOCK_OPEN_AI_API_KEY, OAI_CONFIG_LIST
 from test.credentials import Credentials, Secrets
 
+__all__ = ["Credentials"]
+
 
 def patch_pytest_terminal_writer() -> None:
     import _pytest._io
diff --git a/website/docs/user-guide/reference-agents/DocAgent_Architecture.png b/website/docs/user-guide/reference-agents/DocAgent_Architecture.png
new file mode 100644
index 00000000000..b3b721584bf
--- /dev/null
+++ b/website/docs/user-guide/reference-agents/DocAgent_Architecture.png
@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:c03a96d2c7e871ee832449717734b49ba78aec3c31b09e9c8a2ba6ea24cdc90e
+size 396420
diff --git a/website/docs/user-guide/reference-agents/docagent.mdx b/website/docs/user-guide/reference-agents/docagent.mdx
index e9be9cd8a43..1e670f965cf 100644
--- a/website/docs/user-guide/reference-agents/docagent.mdx
+++ b/website/docs/user-guide/reference-agents/docagent.mdx
@@ -3,439 +3,564 @@ title: DocAgent
 sidebarTitle: DocAgent
 ---
 
-In the realm of AI and automation, handling documents and extracting information efficiently is of utmost importance.
-[`DocAgent`](/docs/api-reference/autogen/agents/experimental/DocAgent) introduces an agentic solution to this problem. It handles document ingestion and query tasks seamlessly, and with natural language instructions, by leveraging an internal swarm of agents to streamline document processing and information retrieval.
+# DocAgent: Comprehensive Documentation
 
-
DocAgent Architecture Diagram
+ +
+