Skip to content

Commit 49a0c20

Browse files
committed
⚡️(summarize) use semchunk for better doc chunking
This reduces the code complexity while allowing better "cuts" also providing overlap for free. Also, do not wait for sub-batch to complete a use a global concurrency instead.
1 parent ec9af9a commit 49a0c20

File tree

7 files changed

+166
-99
lines changed

7 files changed

+166
-99
lines changed

src/backend/chat/agents/summarize.py

Lines changed: 91 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
"""Build the summarization agent."""
22

3+
import asyncio
34
import dataclasses
45
import logging
5-
import asyncio
66

77
from django.conf import settings
88
from django.core.files.storage import default_storage
99

10+
import semchunk
1011
from asgiref.sync import sync_to_async
1112
from pydantic_ai import RunContext
1213
from pydantic_ai.messages import ToolReturn
1314

1415
from .base import BaseAgent
15-
from ..tools.document_search_rag import add_document_rag_search_tool
1616

1717
logger = logging.getLogger(__name__)
1818

@@ -37,28 +37,49 @@ def read_document_content(doc):
3737
return doc.file_name, f.read().decode("utf-8")
3838

3939

40-
async def hand_off_to_summarization_agent(
40+
async def summarize_chunk(idx, chunk, total_chunks, summarization_agent, ctx):
41+
"""Summarize a single chunk of text."""
42+
sum_prompt = (
43+
"You are an agent specializing in text summarization. "
44+
"Generate a clear and concise summary of the following passage "
45+
f"(part {idx}/{total_chunks}):\n'''\n{chunk}\n'''\n\n"
46+
)
47+
48+
logger.debug(
49+
"[summarize] CHUNK %s/%s prompt=> %s", idx, total_chunks, sum_prompt[0:100] + "..."
50+
)
51+
52+
resp = await summarization_agent.run(sum_prompt, usage=ctx.usage)
53+
54+
logger.debug("[summarize] CHUNK %s/%s response<= %s", idx, total_chunks, resp.output or "")
55+
return resp.output or ""
56+
57+
58+
async def hand_off_to_summarization_agent( # pylint: disable=too-many-locals
4159
ctx: RunContext, *, instructions: str | None = None
4260
) -> ToolReturn:
4361
"""
44-
Summarize the documents for the user, only when asked for.
62+
Generate a complete, ready-to-use summary of the documents in context
63+
(do not request the documents to the user).
64+
Return this summary directly to the user WITHOUT any modification,
65+
or additional summarization.
66+
The summary is already optimized and MUST be presented as-is in the final response
67+
or translated preserving the information.
68+
4569
Instructions are optional but should reflect the user's request.
46-
Examples :
47-
"Résume ce doc en 2 paragraphes" -> instructions = "résumé en 2 paragraphes"
48-
"Résume ce doc en anglais" -> instructions = "In English"
49-
"Résume ce doc" -> instructions = "" (default)
70+
71+
Examples:
72+
"Summarize this doc in 2 paragraphs" -> instructions = "summary in 2 paragraphs"
73+
"Summarize this doc in English" -> instructions = "In English"
74+
"Summarize this doc" -> instructions = "" (default)
75+
5076
Args:
5177
instructions (str | None): The instructions the user gave to use for the summarization
5278
"""
53-
summarization_agent = SummarizationAgent()
54-
55-
prompt = (
56-
"Do not mention the user request in your answer.\n"
57-
"User request:\n"
58-
"{user_prompt}\n\n"
59-
"Document contents:\n"
60-
"{documents_prompt}\n"
79+
instructions_hint = (
80+
instructions.strip() if instructions else "The summary should contain 2 or 3 parts."
6181
)
82+
summarization_agent = SummarizationAgent()
6283

6384
# Collect documents content
6485
text_attachment = await sync_to_async(list)(
@@ -69,70 +90,70 @@ async def hand_off_to_summarization_agent(
6990

7091
documents = [await read_document_content(doc) for doc in text_attachment]
7192

72-
# Instructions: rely on tool argument only; model should extract them upstream
73-
if instructions is not None:
74-
instructions_hint: str = instructions.strip()
75-
else:
76-
instructions_hint = ""
77-
78-
# Helpers
79-
def chunk_text(text: str, size: int = 10000) -> list[str]:
80-
if size <= 0:
81-
return [text]
82-
return [text[i : i + size] for i in range(0, len(text), size)]
83-
84-
# 2) Chunk documents and summarize each chunk
85-
full_text = "\n\n".join(doc[1] for doc in documents)
86-
chunks = chunk_text(full_text, size=10000)
93+
# Chunk documents and summarize each chunk
94+
chunk_size = settings.SUMMARIZATION_CHUNK_SIZE
95+
chunker = semchunk.chunkerify(
96+
tokenizer_or_token_counter=lambda text: len(text.split()),
97+
chunk_size=chunk_size,
98+
)
99+
documents_chunks = chunker(
100+
[doc[1] for doc in documents],
101+
overlap=settings.SUMMARIZATION_OVERLAP_SIZE,
102+
)
103+
87104
logger.info(
88105
"[summarize] chunking: %s parts (size~%s), instructions='%s'",
89-
len(chunks),
90-
10000,
91-
instructions_hint or "",
106+
sum(len(chunks) for chunks in documents_chunks),
107+
chunk_size,
108+
instructions_hint,
92109
)
93110

94-
async def summarize_chunk(idx, chunk, total_chunks, summarization_agent, ctx):
95-
sum_prompt = (
96-
"Tu es un agent spécialisé en synthèses de textes. "
97-
"Génère un résumé clair et concis du passage suivant (partie {idx}/{total}) :\n"
98-
"'''\n{context}\n'''\n\n"
99-
).format(context=chunk, idx=idx, total=total_chunks)
100-
logger.info("[summarize] CHUNK %s/%s prompt=> %s", idx, total_chunks, sum_prompt[0:100]+'...')
101-
resp = await summarization_agent.run(sum_prompt, usage=ctx.usage)
102-
logger.info("[summarize] CHUNK %s/%s response<= %s", idx, total_chunks, resp.output or "")
103-
return resp.output or ""
104-
105-
# Parallelize the chunk summarization in batches of 5 using asyncio.gather
106-
chunk_summaries: list[str] = []
107-
batch_size = 5
108-
for start_idx in range(0, len(chunks), batch_size):
109-
end_idx = start_idx + batch_size
110-
batch_chunks = chunks[start_idx:end_idx]
111+
# Parallelize the chunk summarization with a semaphore to limit concurrent tasks
112+
# because it can be very resource intensive on the LLM backend
113+
semaphore = asyncio.Semaphore(settings.SUMMARIZATION_CONCURRENT_REQUESTS)
114+
115+
async def summarize_chunk_with_semaphore(idx, chunk, total_chunks):
116+
"""Summarize a chunk with semaphore-controlled concurrency."""
117+
async with semaphore:
118+
return await summarize_chunk(idx, chunk, total_chunks, summarization_agent, ctx)
119+
120+
doc_chunk_summaries = []
121+
for doc_chunks in documents_chunks:
111122
summarization_tasks = [
112-
summarize_chunk(idx, chunk, len(chunks), summarization_agent, ctx)
113-
for idx, chunk in enumerate(batch_chunks, start=start_idx + 1)
123+
summarize_chunk_with_semaphore(idx, chunk, len(doc_chunks))
124+
for idx, chunk in enumerate(doc_chunks, start=1)
114125
]
115-
batch_results = await asyncio.gather(*summarization_tasks)
116-
chunk_summaries.extend(batch_results)
117-
118-
if not instructions_hint:
119-
instructions_hint = "Le résumé doit être en Français, contenir 2 ou 3 parties."
126+
chunk_summaries = await asyncio.gather(*summarization_tasks)
127+
doc_chunk_summaries.append(chunk_summaries)
128+
129+
context = "\n\n".join(
130+
doc_name + "\n\n" + "\n\n".join(summaries)
131+
for doc_name, summaries in zip(
132+
(doc[0] for doc in documents),
133+
doc_chunk_summaries,
134+
strict=True,
135+
)
136+
)
120137

121-
# 3) Merge chunk summaries into a single concise summary
138+
# Merge chunk summaries into a single concise summary
122139
merged_prompt = (
123-
"Produit une synthèse cohérente à partir des résumés ci-dessous.\n\n"
124-
"'''\n{context}\n'''\n\n"
125-
"Contraintes :\n"
126-
"- Résumer sans répéter.\n"
127-
"- Harmoniser le style et la terminologie.\n"
128-
"- Le résumé final doit être bien structuré et formaté en markdown. \n"
129-
"- Respecter les consignes : {instructions}\n"
130-
"Réponds directement avec le résumé final."
131-
).format(context="\n\n".join(chunk_summaries), instructions=instructions_hint or "")
132-
logger.info("[summarize] MERGE prompt=> %s", merged_prompt)
140+
"Produce a coherent synthesis from the summaries below.\n\n"
141+
f"'''\n{context}\n'''\n\n"
142+
"Constraints:\n"
143+
"- Summarize without repetition.\n"
144+
"- Harmonize style and terminology.\n"
145+
"- The final summary must be well-structured and formatted in markdown.\n"
146+
f"- Follow the instructions: {instructions_hint}\n"
147+
"Respond directly with the final summary."
148+
)
149+
150+
logger.debug("[summarize] MERGE prompt=> %s", merged_prompt)
151+
133152
merged_resp = await summarization_agent.run(merged_prompt, usage=ctx.usage)
153+
134154
final_summary = (merged_resp.output or "").strip()
135-
logger.info("[summarize] MERGE response<= %s", final_summary)
155+
156+
logger.debug("[summarize] MERGE response<= %s", final_summary)
136157

137158
return ToolReturn(
138159
return_value=final_summary,

src/backend/chat/clients/pydantic_ai.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -483,11 +483,14 @@ def force_web_search_prompt() -> str:
483483
@self.conversation_agent.system_prompt
484484
def summarization_system_prompt() -> str:
485485
return (
486-
"When the user asks to summarize attached document(s), you MUST call the"
487-
" summarize tool. Pass user's instructions if provided, otherwise pass an"
488-
" empty instructions string once the user confirms (e.g. says 'ok'). Do NOT"
489-
" call web search or document_search_rag to produce summaries; rely only on"
490-
" the attached documents stored in context."
486+
"When you receive a result from the summarization tool, you MUST return it "
487+
"directly to the user without any modification, paraphrasing, or additional "
488+
"summarization."
489+
"The tool already produces optimized summaries that should be presented "
490+
"verbatim."
491+
"You may translate the summary if required, but you MUST preserve all the "
492+
"information from the original summary."
493+
"You may add a follow-up question after the summary if needed."
491494
)
492495

493496
# Inform the model (system-level) that documents are attached and available

src/backend/chat/tests/views/chat/conversations/test_conversation_with_document_upload.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -374,11 +374,10 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
374374
"timestamp": timezone_now,
375375
},
376376
{
377-
"content": "If the user wants specific information from a "
378-
"document, invoke web_search_albert_rag with an "
379-
"appropriate query string.Do not ask the user for the "
380-
"document; rely on the tool to locate and return "
381-
"relevant passages.",
377+
"content": "Use document_search_rag ONLY to retrieve specific "
378+
"passages from attached documents. Do NOT use it to "
379+
"summarize; for summaries, call the summarize tool "
380+
"instead.",
382381
"dynamic_ref": None,
383382
"part_kind": "system-prompt",
384383
"timestamp": timezone_now,
@@ -397,6 +396,15 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
397396
"part_kind": "system-prompt",
398397
"timestamp": timezone_now,
399398
},
399+
{
400+
"content": "[Internal context] User documents are attached to this "
401+
"conversation. Do not request re-upload of documents; "
402+
"consider them already available via the internal "
403+
"store.",
404+
"dynamic_ref": None,
405+
"part_kind": "system-prompt",
406+
"timestamp": timezone_now,
407+
},
400408
{
401409
"content": ["What does the document say?"],
402410
"part_kind": "user-prompt",
@@ -627,7 +635,7 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
627635
'document discusses various topics."}\n'
628636
'0:"The document discusses various topics."\n'
629637
'f:{"messageId":"<mocked_uuid>"}\n'
630-
'd:{"finishReason":"stop","usage":{"promptTokens":201,"completionTokens":13}}\n'
638+
'd:{"finishReason":"stop","usage":{"promptTokens":317,"completionTokens":19}}\n'
631639
)
632640

633641
# Check that the conversation was updated
@@ -709,11 +717,10 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
709717
"timestamp": timezone_now,
710718
},
711719
{
712-
"content": "If the user wants specific information from a "
713-
"document, invoke web_search_albert_rag with an "
714-
"appropriate query string.Do not ask the user for the "
715-
"document; rely on the tool to locate and return "
716-
"relevant passages.",
720+
"content": "Use document_search_rag ONLY to retrieve specific "
721+
"passages from attached documents. Do NOT use it to "
722+
"summarize; for summaries, call the summarize tool "
723+
"instead.",
717724
"dynamic_ref": None,
718725
"part_kind": "system-prompt",
719726
"timestamp": timezone_now,
@@ -732,6 +739,15 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
732739
"part_kind": "system-prompt",
733740
"timestamp": timezone_now,
734741
},
742+
{
743+
"content": "[Internal context] User documents are attached to this "
744+
"conversation. Do not request re-upload of documents; "
745+
"consider them already available via the internal "
746+
"store.",
747+
"dynamic_ref": None,
748+
"part_kind": "system-prompt",
749+
"timestamp": timezone_now,
750+
},
735751
{
736752
"content": ["Make a summary of this document."],
737753
"part_kind": "user-prompt",

src/backend/chat/tests/views/chat/conversations/test_conversation_with_document_url.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -881,10 +881,9 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
881881
SystemPromptPart(content="Answer in english.", timestamp=timezone.now()),
882882
SystemPromptPart(
883883
content=(
884-
"If the user wants specific information from a document, "
885-
"invoke web_search_albert_rag with an appropriate query string."
886-
"Do not ask the user for the document; rely on the tool to locate "
887-
"and return relevant passages."
884+
"Use document_search_rag ONLY to retrieve specific passages from "
885+
"attached documents. Do NOT use it to summarize; for summaries, "
886+
"call the summarize tool instead."
888887
),
889888
timestamp=timezone.now(),
890889
),
@@ -901,6 +900,14 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
901900
),
902901
timestamp=timezone.now(),
903902
),
903+
SystemPromptPart(
904+
content=(
905+
"[Internal context] User documents are attached to this conversation. "
906+
"Do not request re-upload of documents; consider them already "
907+
"available via the internal store."
908+
),
909+
timestamp=timezone.now(),
910+
),
904911
UserPromptPart(
905912
content=[
906913
"What is in this document?",
@@ -1002,11 +1009,10 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
10021009
"timestamp": timestamp,
10031010
},
10041011
{
1005-
"content": "If the user wants specific information from a "
1006-
"document, invoke web_search_albert_rag with an "
1007-
"appropriate query string.Do not ask the user for the "
1008-
"document; rely on the tool to locate and return "
1009-
"relevant passages.",
1012+
"content": "Use document_search_rag ONLY to retrieve specific "
1013+
"passages from attached documents. Do NOT use it to "
1014+
"summarize; for summaries, call the summarize tool "
1015+
"instead.",
10101016
"dynamic_ref": None,
10111017
"part_kind": "system-prompt",
10121018
"timestamp": timestamp,
@@ -1025,6 +1031,15 @@ async def agent_model(messages: list[ModelMessage], _info: AgentInfo):
10251031
"part_kind": "system-prompt",
10261032
"timestamp": timestamp,
10271033
},
1034+
{
1035+
"content": "[Internal context] User documents are attached to "
1036+
"this conversation. Do not request re-upload of "
1037+
"documents; consider them already available via the "
1038+
"internal store.",
1039+
"dynamic_ref": None,
1040+
"part_kind": "system-prompt",
1041+
"timestamp": timestamp,
1042+
},
10281043
{
10291044
"content": [
10301045
"What is in this document?",

src/backend/chat/tools/document_search_rag.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ def document_search_rag(ctx: RunContext, query: str) -> ToolReturn:
2222
ctx (RunContext): The run context containing the conversation.
2323
query (str): The query to search the documents for.
2424
"""
25-
# Defensive: ctx.deps or ctx.deps.conversation may be unavailable in some flows (start of conversation)
26-
if not getattr(ctx, "deps", None) or not getattr(ctx.deps, "conversation", None):
27-
return ToolReturn(return_value=[], content="", metadata={"sources": set()})
28-
2925
document_store_backend = import_string(settings.RAG_DOCUMENT_SEARCH_BACKEND)
3026

3127
document_store = document_store_backend(ctx.deps.conversation.collection_id)

src/backend/conversations/settings.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,21 @@ class Base(BraveSettings, Configuration):
786786
environ_name="SUMMARIZATION_SYSTEM_PROMPT",
787787
environ_prefix=None,
788788
)
789+
SUMMARIZATION_CHUNK_SIZE = values.PositiveIntegerValue(
790+
default=20_000, # Approx 20k words per chunk
791+
environ_name="SUMMARIZATION_CHUNK_SIZE",
792+
environ_prefix=None,
793+
)
794+
SUMMARIZATION_OVERLAP_SIZE = values.FloatValue(
795+
default=0.05, # 5% overlap
796+
environ_name="SUMMARIZATION_OVERLAP_SIZE",
797+
environ_prefix=None,
798+
)
799+
SUMMARIZATION_CONCURRENT_REQUESTS = values.PositiveIntegerValue(
800+
default=5,
801+
environ_name="SUMMARIZATION_CONCURRENT_REQUESTS",
802+
environ_prefix=None,
803+
)
789804

790805
# Tavily API
791806
TAVILY_API_KEY = values.Value(

0 commit comments

Comments
 (0)