Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 166 additions & 137 deletions metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1227,12 +1227,32 @@ def collect_model_tag_combinations(self):

def _fetch_all_tool_metrics(self):
"""
Fetch all tool metrics using separate MongoDB aggregation queries.
This approach is compatible with AWS DocumentDB and other MongoDB-compatible databases.
Uses sequential queries to gather all tool usage data efficiently.
Fetch all tool metrics using consolidated MongoDB aggregation queries.

DocumentDB / large-collection strategy
----------------------------------------
The naive approach is to $unwind the entire `content` array and then
$match on content.type == "tool_call". This is catastrophically slow
because the content array can contain large blobs (images, long text)
and DocumentDB must materialise every element before filtering.

Instead we:
1. Pre-filter at the document level with a $match that uses the
multikey index on content.type (only messages that have ≥1
tool_call item are fetched).
2. Use $project + $filter to extract just the tool_call items into a
tiny derived array "_tc" — one small object per call, not the
full content blob.
3. $unwind the tiny "_tc" array (typically 1-3 items vs 10-50+).
4. $group on that cheap unwound field.

This collapses what used to be 11 sequential queries each doing a full
content-array expansion into 4 queries that work on a fraction of the
data.
"""
try:
five_minutes_ago = datetime.now(timezone.utc) - timedelta(minutes=5)
has_tool_call = {"content.type": "tool_call"}

# Initialize cache
cache = {
Expand All @@ -1249,164 +1269,173 @@ def _fetch_all_tool_metrics(self):
'active_tool_users': 0
}

# Query 1: Total tool calls
total_calls_pipeline = [
{"$unwind": "$content"},
{"$match": {"content.type": "tool_call"}},
{"$count": "count"}
]
total_calls_result = list(self.messages_collection.aggregate(total_calls_pipeline))
if total_calls_result:
cache['total_calls'] = total_calls_result[0]['count']

# Query 2: Tool calls per tool name
per_tool_pipeline = [
{"$unwind": "$content"},
{"$match": {"content.type": "tool_call", "content.tool_call.name": {"$exists": True}}},
{"$group": {"_id": "$content.tool_call.name", "count": {"$sum": 1}}}
]
for item in self.messages_collection.aggregate(per_tool_pipeline):
tool = item['_id'] or 'unknown'
cache['per_tool'][tool] = item['count']

# Query 3: Tool calls per model
per_model_pipeline = [
{"$unwind": "$content"},
{
"$match": {
"content.type": "tool_call",
"content.tool_call.name": {"$exists": True},
"model": {"$exists": True, "$ne": None}
# Shared $project stage: keep only the fields we need and shrink
# the content array down to just {name, output} for tool_call items.
# Processing this tiny array is orders of magnitude cheaper than
# unwinding the full content array with all its blobs.
extract_tc_project = {
"$project": {
"model": 1,
"endpoint": 1,
"user": 1,
"_tc": {
"$filter": {
"input": {"$ifNull": ["$content", []]},
"as": "c",
"cond": {"$eq": ["$$c.type", "tool_call"]}
}
}
},
}
}

# ── Query A (all-time): total calls + per-tool + per-model + per-endpoint
# Single pipeline pass over the pre-filtered document set.
all_time_pipeline = [
{"$match": has_tool_call},
extract_tc_project,
{"$unwind": "$_tc"}, # unwind the tiny tool-calls-only array
{
"$group": {
"_id": {"model": "$model", "tool": "$content.tool_call.name"},
"_id": {
"tool": {"$ifNull": ["$_tc.tool_call.name", "unknown"]},
"model": {"$ifNull": ["$model", None]},
"endpoint": {"$ifNull": ["$endpoint", None]},
},
"count": {"$sum": 1}
}
}
]
for item in self.messages_collection.aggregate(per_model_pipeline):
model = item['_id'].get('model') or 'unknown'
tool = item['_id'].get('tool') or 'unknown'
key = (model, tool)
cache['per_model'][key] = item['count']

# Query 4: Tool calls per endpoint
per_endpoint_pipeline = [
{"$unwind": "$content"},
for item in self.messages_collection.aggregate(all_time_pipeline):
gid = item["_id"]
tool = gid.get("tool") or "unknown"
model = gid.get("model") or "unknown"
ep = gid.get("endpoint") or "unknown"
cnt = item["count"]

cache["total_calls"] += cnt
cache["per_tool"][tool] = cache["per_tool"].get(tool, 0) + cnt
if gid.get("model"):
cache["per_model"][(model, tool)] = (
cache["per_model"].get((model, tool), 0) + cnt
)
if gid.get("endpoint"):
cache["per_endpoint"][(ep, tool)] = (
cache["per_endpoint"].get((ep, tool), 0) + cnt
)

# ── Query B (all-time): error counts per tool
# $regexMatch with options:"i" matches both capitalised and lowercase
# variants emitted by LibreChat (ToolService.js vs assistants/chatV1.js).
# $convert guards against non-string output values (objects, numbers)
# that would cause $regexMatch to throw.
error_regex = "error processing tool"
errors_pipeline = [
{"$match": has_tool_call},
{
"$match": {
"content.type": "tool_call",
"content.tool_call.name": {"$exists": True},
"endpoint": {"$exists": True, "$ne": None}
"$project": {
"_tc_err": {
"$filter": {
"input": {"$ifNull": ["$content", []]},
"as": "c",
"cond": {
"$and": [
{"$eq": ["$$c.type", "tool_call"]},
{
"$regexMatch": {
"input": {"$convert": {
"input": "$$c.tool_call.output",
"to": "string",
"onError": "",
"onNull": ""
}},
"regex": error_regex,
"options": "i"
}
}
]
}
}
}
}
},
{"$match": {"_tc_err": {"$ne": []}}}, # skip docs with no errors
{"$unwind": "$_tc_err"},
{
"$group": {
"_id": {"endpoint": "$endpoint", "tool": "$content.tool_call.name"},
"count": {"$sum": 1}
"_id": {"$ifNull": ["$_tc_err.tool_call.name", "unknown"]},
"error_count": {"$sum": 1}
}
}
]
for item in self.messages_collection.aggregate(per_endpoint_pipeline):
endpoint = item['_id'].get('endpoint') or 'unknown'
tool = item['_id'].get('tool') or 'unknown'
key = (endpoint, tool)
cache['per_endpoint'][key] = item['count']

# Query 5: Failed tool calls (errors) per tool
errors_per_tool_pipeline = [
{"$unwind": "$content"},
{
"$match": {
"content.type": "tool_call",
"content.tool_call.name": {"$exists": True},
"content.tool_call.output": {"$regex": "Error processing tool", "$options": "i"}
}
},
{"$group": {"_id": "$content.tool_call.name", "error_count": {"$sum": 1}}}
]
for item in self.messages_collection.aggregate(errors_per_tool_pipeline):
tool = item['_id'] or 'unknown'
cache['errors_per_tool'][tool] = item['error_count']

# Query 6: Total errors
total_errors_pipeline = [
{"$unwind": "$content"},
for item in self.messages_collection.aggregate(errors_pipeline):
tool = item["_id"] or "unknown"
err_cnt = item["error_count"]
cache["errors_per_tool"][tool] = err_cnt
cache["total_errors"] += err_cnt

# ── Query C (5-minute): total calls + per-tool + errors
# Time filter comes first so DocumentDB uses the updatedAt index
# before touching the content array at all.
has_tool_call_5m = {"updatedAt": {"$gte": five_minutes_ago}, **has_tool_call}
five_min_pipeline = [
{"$match": has_tool_call_5m},
{
"$match": {
"content.type": "tool_call",
"content.tool_call.output": {"$regex": "Error processing tool", "$options": "i"}
"$project": {
"user": 1,
"_tc": {
"$filter": {
"input": {"$ifNull": ["$content", []]},
"as": "c",
"cond": {"$eq": ["$$c.type", "tool_call"]}
}
}
}
},
{"$count": "count"}
]
total_errors_result = list(self.messages_collection.aggregate(total_errors_pipeline))
if total_errors_result:
cache['total_errors'] = total_errors_result[0]['count']

# Query 7: Tool calls in last 5 minutes
calls_5m_pipeline = [
{"$match": {"updatedAt": {"$gte": five_minutes_ago}}},
{"$unwind": "$content"},
{"$match": {"content.type": "tool_call"}},
{"$count": "count"}
]
calls_5m_result = list(self.messages_collection.aggregate(calls_5m_pipeline))
if calls_5m_result:
cache['calls_5m'] = calls_5m_result[0]['count']

# Query 8: Tool calls per tool in last 5 minutes
per_tool_5m_pipeline = [
{"$match": {"updatedAt": {"$gte": five_minutes_ago}}},
{"$unwind": "$content"},
{"$match": {"content.type": "tool_call", "content.tool_call.name": {"$exists": True}}},
{"$group": {"_id": "$content.tool_call.name", "count": {"$sum": 1}}}
]
for item in self.messages_collection.aggregate(per_tool_5m_pipeline):
tool = item['_id'] or 'unknown'
cache['per_tool_5m'][tool] = item['count']

# Query 9: Errors in last 5 minutes
errors_5m_pipeline = [
{"$match": {"updatedAt": {"$gte": five_minutes_ago}}},
{"$unwind": "$content"},
{"$unwind": "$_tc"},
{
"$match": {
"content.type": "tool_call",
"content.tool_call.output": {"$regex": "Error processing tool", "$options": "i"}
"$group": {
"_id": {"$ifNull": ["$_tc.tool_call.name", "unknown"]},
"count": {"$sum": 1},
"error_count": {
"$sum": {
"$cond": [
{
"$regexMatch": {
"input": {"$convert": {
"input": "$_tc.tool_call.output",
"to": "string",
"onError": "",
"onNull": ""
}},
"regex": error_regex,
"options": "i"
}
},
1,
0
]
}
}
}
},
{"$count": "count"}
]
errors_5m_result = list(self.messages_collection.aggregate(errors_5m_pipeline))
if errors_5m_result:
cache['errors_5m'] = errors_5m_result[0]['count']

# Query 10: Messages with tool calls
messages_with_tools_pipeline = [
{"$match": {"content.type": "tool_call"}},
{"$count": "count"}
}
]
messages_with_tools_result = list(self.messages_collection.aggregate(messages_with_tools_pipeline))
if messages_with_tools_result:
cache['messages_with_tools'] = messages_with_tools_result[0]['count']

# Query 11: Active tool users (last 5 minutes)
active_tool_users_pipeline = [
{
"$match": {
"updatedAt": {"$gte": five_minutes_ago},
"content.type": "tool_call"
}
},
for item in self.messages_collection.aggregate(five_min_pipeline):
tool = item["_id"] or "unknown"
cache["calls_5m"] += item["count"]
cache["per_tool_5m"][tool] = item["count"]
cache["errors_5m"] += item.get("error_count", 0)

# ── Query D: messages with tool calls + active tool users (5m)
# count_documents uses the multikey index — no aggregation needed.
cache["messages_with_tools"] = self.messages_collection.count_documents(has_tool_call)

active_tool_users_result = list(self.messages_collection.aggregate([
{"$match": has_tool_call_5m},
{"$group": {"_id": "$user"}},
{"$count": "count"}
]
active_tool_users_result = list(self.messages_collection.aggregate(active_tool_users_pipeline))
]))
if active_tool_users_result:
cache['active_tool_users'] = active_tool_users_result[0]['count']
cache["active_tool_users"] = active_tool_users_result[0]["count"]

self._tool_cache = cache
logger.debug("Tool metrics cached: %d tools, %d model combinations, %d endpoint combinations",
Expand Down