From 2256d2a94ac2ac0dc65d3620f508f3e271fb1056 Mon Sep 17 00:00:00 2001 From: Dmytro Borysenko Date: Thu, 7 May 2026 10:38:19 -0500 Subject: [PATCH 1/3] =?UTF-8?q?perf:=20consolidate=20tool=20metrics=20quer?= =?UTF-8?q?ies=20(11=20=E2=86=92=204=20pipelines)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation ran 11 sequential MongoDB aggregations, each starting with $unwind on the full content array. On DocumentDB this is very expensive because the content array can contain large blobs (images, long text) that must be materialised before the $match on content.type can filter them out. New approach: - Pre-filter documents at the collection level using $match on content.type (hits the multikey index, skips non-tool messages). - Use $project + $filter to extract a tiny "_tc" array containing only the tool_call items before unwinding — typically 1-3 elements instead of 10-50+. - Query A: single all-time pass that accumulates total calls, per-tool, per-model, and per-endpoint counts in one pipeline. - Query B: all-time error counts; uses $indexOfCP instead of $regex to avoid a full string scan on every output blob (DocumentDB-compatible). - Query C: 5-minute window — total calls, per-tool, and errors in one pipeline; time filter is placed first to hit the updatedAt index. - Query D: messages_with_tools via count_documents (index-only, no aggregation); active_tool_users with a minimal group pipeline. Co-authored-by: Cursor --- metrics.py | 299 +++++++++++++++++++++++++++++------------------------ 1 file changed, 162 insertions(+), 137 deletions(-) diff --git a/metrics.py b/metrics.py index aec46b4..fdc525c 100644 --- a/metrics.py +++ b/metrics.py @@ -1227,12 +1227,32 @@ def collect_model_tag_combinations(self): def _fetch_all_tool_metrics(self): """ - Fetch all tool metrics using separate MongoDB aggregation queries. - This approach is compatible with AWS DocumentDB and other MongoDB-compatible databases. - Uses sequential queries to gather all tool usage data efficiently. + Fetch all tool metrics using consolidated MongoDB aggregation queries. + + DocumentDB / large-collection strategy + ---------------------------------------- + The naive approach is to $unwind the entire `content` array and then + $match on content.type == "tool_call". This is catastrophically slow + because the content array can contain large blobs (images, long text) + and DocumentDB must materialise every element before filtering. + + Instead we: + 1. Pre-filter at the document level with a $match that uses the + multikey index on content.type (only messages that have ≥1 + tool_call item are fetched). + 2. Use $project + $filter to extract just the tool_call items into a + tiny derived array "_tc" — one small object per call, not the + full content blob. + 3. $unwind the tiny "_tc" array (typically 1-3 items vs 10-50+). + 4. $group on that cheap unwound field. + + This collapses what used to be 11 sequential queries each doing a full + content-array expansion into 4 queries that work on a fraction of the + data. """ try: five_minutes_ago = datetime.now(timezone.utc) - timedelta(minutes=5) + has_tool_call = {"content.type": "tool_call"} # Initialize cache cache = { @@ -1249,164 +1269,169 @@ def _fetch_all_tool_metrics(self): 'active_tool_users': 0 } - # Query 1: Total tool calls - total_calls_pipeline = [ - {"$unwind": "$content"}, - {"$match": {"content.type": "tool_call"}}, - {"$count": "count"} - ] - total_calls_result = list(self.messages_collection.aggregate(total_calls_pipeline)) - if total_calls_result: - cache['total_calls'] = total_calls_result[0]['count'] - - # Query 2: Tool calls per tool name - per_tool_pipeline = [ - {"$unwind": "$content"}, - {"$match": {"content.type": "tool_call", "content.tool_call.name": {"$exists": True}}}, - {"$group": {"_id": "$content.tool_call.name", "count": {"$sum": 1}}} - ] - for item in self.messages_collection.aggregate(per_tool_pipeline): - tool = item['_id'] or 'unknown' - cache['per_tool'][tool] = item['count'] - - # Query 3: Tool calls per model - per_model_pipeline = [ - {"$unwind": "$content"}, - { - "$match": { - "content.type": "tool_call", - "content.tool_call.name": {"$exists": True}, - "model": {"$exists": True, "$ne": None} + # Shared $project stage: keep only the fields we need and shrink + # the content array down to just {name, output} for tool_call items. + # Processing this tiny array is orders of magnitude cheaper than + # unwinding the full content array with all its blobs. + extract_tc_project = { + "$project": { + "model": 1, + "endpoint": 1, + "user": 1, + "_tc": { + "$filter": { + "input": {"$ifNull": ["$content", []]}, + "as": "c", + "cond": {"$eq": ["$$c.type", "tool_call"]} + } } - }, + } + } + + # ── Query A (all-time): total calls + per-tool + per-model + per-endpoint + # Single pipeline pass over the pre-filtered document set. + all_time_pipeline = [ + {"$match": has_tool_call}, + extract_tc_project, + {"$unwind": "$_tc"}, # unwind the tiny tool-calls-only array { "$group": { - "_id": {"model": "$model", "tool": "$content.tool_call.name"}, + "_id": { + "tool": {"$ifNull": ["$_tc.tool_call.name", "unknown"]}, + "model": {"$ifNull": ["$model", None]}, + "endpoint": {"$ifNull": ["$endpoint", None]}, + }, "count": {"$sum": 1} } } ] - for item in self.messages_collection.aggregate(per_model_pipeline): - model = item['_id'].get('model') or 'unknown' - tool = item['_id'].get('tool') or 'unknown' - key = (model, tool) - cache['per_model'][key] = item['count'] - - # Query 4: Tool calls per endpoint - per_endpoint_pipeline = [ - {"$unwind": "$content"}, + for item in self.messages_collection.aggregate(all_time_pipeline): + gid = item["_id"] + tool = gid.get("tool") or "unknown" + model = gid.get("model") or "unknown" + ep = gid.get("endpoint") or "unknown" + cnt = item["count"] + + cache["total_calls"] += cnt + cache["per_tool"][tool] = cache["per_tool"].get(tool, 0) + cnt + if gid.get("model"): + cache["per_model"][(model, tool)] = ( + cache["per_model"].get((model, tool), 0) + cnt + ) + if gid.get("endpoint"): + cache["per_endpoint"][(ep, tool)] = ( + cache["per_endpoint"].get((ep, tool), 0) + cnt + ) + + # ── Query B (all-time): error counts per tool + # Use $indexOfCP (DocumentDB-compatible) instead of $regex so we + # avoid a regex scan on every output string after unwind. + error_output_prefix = "Error processing tool" + errors_pipeline = [ + {"$match": has_tool_call}, { - "$match": { - "content.type": "tool_call", - "content.tool_call.name": {"$exists": True}, - "endpoint": {"$exists": True, "$ne": None} + "$project": { + "_tc_err": { + "$filter": { + "input": {"$ifNull": ["$content", []]}, + "as": "c", + "cond": { + "$and": [ + {"$eq": ["$$c.type", "tool_call"]}, + { + "$gte": [ + { + "$indexOfCP": [ + {"$ifNull": ["$$c.tool_call.output", ""]}, + error_output_prefix + ] + }, + 0 + ] + } + ] + } + } + } } }, + {"$match": {"_tc_err": {"$ne": []}}}, # skip docs with no errors + {"$unwind": "$_tc_err"}, { "$group": { - "_id": {"endpoint": "$endpoint", "tool": "$content.tool_call.name"}, - "count": {"$sum": 1} + "_id": {"$ifNull": ["$_tc_err.tool_call.name", "unknown"]}, + "error_count": {"$sum": 1} } } ] - for item in self.messages_collection.aggregate(per_endpoint_pipeline): - endpoint = item['_id'].get('endpoint') or 'unknown' - tool = item['_id'].get('tool') or 'unknown' - key = (endpoint, tool) - cache['per_endpoint'][key] = item['count'] - - # Query 5: Failed tool calls (errors) per tool - errors_per_tool_pipeline = [ - {"$unwind": "$content"}, - { - "$match": { - "content.type": "tool_call", - "content.tool_call.name": {"$exists": True}, - "content.tool_call.output": {"$regex": "Error processing tool", "$options": "i"} - } - }, - {"$group": {"_id": "$content.tool_call.name", "error_count": {"$sum": 1}}} - ] - for item in self.messages_collection.aggregate(errors_per_tool_pipeline): - tool = item['_id'] or 'unknown' - cache['errors_per_tool'][tool] = item['error_count'] - - # Query 6: Total errors - total_errors_pipeline = [ - {"$unwind": "$content"}, + for item in self.messages_collection.aggregate(errors_pipeline): + tool = item["_id"] or "unknown" + err_cnt = item["error_count"] + cache["errors_per_tool"][tool] = err_cnt + cache["total_errors"] += err_cnt + + # ── Query C (5-minute): total calls + per-tool + errors + # Time filter comes first so DocumentDB uses the updatedAt index + # before touching the content array at all. + has_tool_call_5m = {"updatedAt": {"$gte": five_minutes_ago}, **has_tool_call} + five_min_pipeline = [ + {"$match": has_tool_call_5m}, { - "$match": { - "content.type": "tool_call", - "content.tool_call.output": {"$regex": "Error processing tool", "$options": "i"} + "$project": { + "user": 1, + "_tc": { + "$filter": { + "input": {"$ifNull": ["$content", []]}, + "as": "c", + "cond": {"$eq": ["$$c.type", "tool_call"]} + } + } } }, - {"$count": "count"} - ] - total_errors_result = list(self.messages_collection.aggregate(total_errors_pipeline)) - if total_errors_result: - cache['total_errors'] = total_errors_result[0]['count'] - - # Query 7: Tool calls in last 5 minutes - calls_5m_pipeline = [ - {"$match": {"updatedAt": {"$gte": five_minutes_ago}}}, - {"$unwind": "$content"}, - {"$match": {"content.type": "tool_call"}}, - {"$count": "count"} - ] - calls_5m_result = list(self.messages_collection.aggregate(calls_5m_pipeline)) - if calls_5m_result: - cache['calls_5m'] = calls_5m_result[0]['count'] - - # Query 8: Tool calls per tool in last 5 minutes - per_tool_5m_pipeline = [ - {"$match": {"updatedAt": {"$gte": five_minutes_ago}}}, - {"$unwind": "$content"}, - {"$match": {"content.type": "tool_call", "content.tool_call.name": {"$exists": True}}}, - {"$group": {"_id": "$content.tool_call.name", "count": {"$sum": 1}}} - ] - for item in self.messages_collection.aggregate(per_tool_5m_pipeline): - tool = item['_id'] or 'unknown' - cache['per_tool_5m'][tool] = item['count'] - - # Query 9: Errors in last 5 minutes - errors_5m_pipeline = [ - {"$match": {"updatedAt": {"$gte": five_minutes_ago}}}, - {"$unwind": "$content"}, + {"$unwind": "$_tc"}, { - "$match": { - "content.type": "tool_call", - "content.tool_call.output": {"$regex": "Error processing tool", "$options": "i"} + "$group": { + "_id": {"$ifNull": ["$_tc.tool_call.name", "unknown"]}, + "count": {"$sum": 1}, + "error_count": { + "$sum": { + "$cond": [ + { + "$gte": [ + { + "$indexOfCP": [ + {"$ifNull": ["$_tc.tool_call.output", ""]}, + error_output_prefix + ] + }, + 0 + ] + }, + 1, + 0 + ] + } + } } - }, - {"$count": "count"} - ] - errors_5m_result = list(self.messages_collection.aggregate(errors_5m_pipeline)) - if errors_5m_result: - cache['errors_5m'] = errors_5m_result[0]['count'] - - # Query 10: Messages with tool calls - messages_with_tools_pipeline = [ - {"$match": {"content.type": "tool_call"}}, - {"$count": "count"} + } ] - messages_with_tools_result = list(self.messages_collection.aggregate(messages_with_tools_pipeline)) - if messages_with_tools_result: - cache['messages_with_tools'] = messages_with_tools_result[0]['count'] - - # Query 11: Active tool users (last 5 minutes) - active_tool_users_pipeline = [ - { - "$match": { - "updatedAt": {"$gte": five_minutes_ago}, - "content.type": "tool_call" - } - }, + for item in self.messages_collection.aggregate(five_min_pipeline): + tool = item["_id"] or "unknown" + cache["calls_5m"] += item["count"] + cache["per_tool_5m"][tool] = item["count"] + cache["errors_5m"] += item.get("error_count", 0) + + # ── Query D: messages with tool calls + active tool users (5m) + # count_documents uses the multikey index — no aggregation needed. + cache["messages_with_tools"] = self.messages_collection.count_documents(has_tool_call) + + active_tool_users_result = list(self.messages_collection.aggregate([ + {"$match": has_tool_call_5m}, {"$group": {"_id": "$user"}}, {"$count": "count"} - ] - active_tool_users_result = list(self.messages_collection.aggregate(active_tool_users_pipeline)) + ])) if active_tool_users_result: - cache['active_tool_users'] = active_tool_users_result[0]['count'] + cache["active_tool_users"] = active_tool_users_result[0]["count"] self._tool_cache = cache logger.debug("Tool metrics cached: %d tools, %d model combinations, %d endpoint combinations", From 0d4836d17fce7ac0d28e7f529ec34b24449f071f Mon Sep 17 00:00:00 2001 From: Dmytro Borysenko Date: Thu, 7 May 2026 21:02:00 -0500 Subject: [PATCH 2/3] fix: case-insensitive error matching and defensive output type cast - Replace $indexOfCP with $regexMatch (options:"i") in Query B and C to match both capitalised and lowercase error strings emitted by LibreChat (ToolService.js vs assistants/chatV1.js). - Replace $ifNull on tool_call.output with $convert (to:string, onError:"", onNull:"") to guard against non-string stored values that would cause $regexMatch to throw. Co-authored-by: Cursor --- metrics.py | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/metrics.py b/metrics.py index fdc525c..9e6b672 100644 --- a/metrics.py +++ b/metrics.py @@ -1324,9 +1324,11 @@ def _fetch_all_tool_metrics(self): ) # ── Query B (all-time): error counts per tool - # Use $indexOfCP (DocumentDB-compatible) instead of $regex so we - # avoid a regex scan on every output string after unwind. - error_output_prefix = "Error processing tool" + # $regexMatch with options:"i" matches both capitalised and lowercase + # variants emitted by LibreChat (ToolService.js vs assistants/chatV1.js). + # $convert guards against non-string output values (objects, numbers) + # that would cause $regexMatch to throw. + error_regex = "error processing tool" errors_pipeline = [ {"$match": has_tool_call}, { @@ -1339,15 +1341,11 @@ def _fetch_all_tool_metrics(self): "$and": [ {"$eq": ["$$c.type", "tool_call"]}, { - "$gte": [ - { - "$indexOfCP": [ - {"$ifNull": ["$$c.tool_call.output", ""]}, - error_output_prefix - ] - }, - 0 - ] + "$regexMatch": { + "input": {"$convert": {"input": "$$c.tool_call.output", "to": "string", "onError": "", "onNull": ""}}, + "regex": error_regex, + "options": "i" + } } ] } @@ -1397,15 +1395,11 @@ def _fetch_all_tool_metrics(self): "$sum": { "$cond": [ { - "$gte": [ - { - "$indexOfCP": [ - {"$ifNull": ["$_tc.tool_call.output", ""]}, - error_output_prefix - ] - }, - 0 - ] + "$regexMatch": { + "input": {"$convert": {"input": "$_tc.tool_call.output", "to": "string", "onError": "", "onNull": ""}}, + "regex": error_regex, + "options": "i" + } }, 1, 0 From d12d36e2d8d5c4569db9e8e818be9d4a51c0b328 Mon Sep 17 00:00:00 2001 From: Dmytro Borysenko Date: Thu, 7 May 2026 21:31:03 -0500 Subject: [PATCH 3/3] fix: resolve flake8 linting errors - Remove alignment whitespace in Query A and C result loops (E221, E272) - Wrap long $convert expressions across multiple lines (E501) Co-authored-by: Cursor --- metrics.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/metrics.py b/metrics.py index 9e6b672..82e1576 100644 --- a/metrics.py +++ b/metrics.py @@ -1306,11 +1306,11 @@ def _fetch_all_tool_metrics(self): } ] for item in self.messages_collection.aggregate(all_time_pipeline): - gid = item["_id"] - tool = gid.get("tool") or "unknown" - model = gid.get("model") or "unknown" - ep = gid.get("endpoint") or "unknown" - cnt = item["count"] + gid = item["_id"] + tool = gid.get("tool") or "unknown" + model = gid.get("model") or "unknown" + ep = gid.get("endpoint") or "unknown" + cnt = item["count"] cache["total_calls"] += cnt cache["per_tool"][tool] = cache["per_tool"].get(tool, 0) + cnt @@ -1342,7 +1342,12 @@ def _fetch_all_tool_metrics(self): {"$eq": ["$$c.type", "tool_call"]}, { "$regexMatch": { - "input": {"$convert": {"input": "$$c.tool_call.output", "to": "string", "onError": "", "onNull": ""}}, + "input": {"$convert": { + "input": "$$c.tool_call.output", + "to": "string", + "onError": "", + "onNull": "" + }}, "regex": error_regex, "options": "i" } @@ -1396,7 +1401,12 @@ def _fetch_all_tool_metrics(self): "$cond": [ { "$regexMatch": { - "input": {"$convert": {"input": "$_tc.tool_call.output", "to": "string", "onError": "", "onNull": ""}}, + "input": {"$convert": { + "input": "$_tc.tool_call.output", + "to": "string", + "onError": "", + "onNull": "" + }}, "regex": error_regex, "options": "i" } @@ -1411,9 +1421,9 @@ def _fetch_all_tool_metrics(self): ] for item in self.messages_collection.aggregate(five_min_pipeline): tool = item["_id"] or "unknown" - cache["calls_5m"] += item["count"] + cache["calls_5m"] += item["count"] cache["per_tool_5m"][tool] = item["count"] - cache["errors_5m"] += item.get("error_count", 0) + cache["errors_5m"] += item.get("error_count", 0) # ── Query D: messages with tool calls + active tool users (5m) # count_documents uses the multikey index — no aggregation needed.