Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 107 additions & 59 deletions plugins/filters/gemini_manifold_companion.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ async def outlet(

if stored_metadata:
log.info("Found grounding metadata, processing citations.")
log.trace("Stored grounding metadata:", payload=stored_metadata)

current_content = body["messages"][-1]["content"]
if isinstance(current_content, list):
Expand Down Expand Up @@ -530,89 +531,134 @@ async def _resolve_and_emit_sources(
"""
initial_metadatas: list[tuple[int, str]] = []
for i, g_c in enumerate(grounding_chunks):
uri = None
if (web_info := g_c.web) and web_info.uri:
initial_metadatas.append((i, web_info.uri))
uri = web_info.uri
elif (maps_info := g_c.maps) and maps_info.uri:
uri = maps_info.uri

if uri:
initial_metadatas.append((i, uri))

if not initial_metadatas:
log.info("No source URIs found, skipping URL resolution.")
log.info("No source URIs found, skipping source emission.")
return

num_urls = len(initial_metadatas)
self._emit_status_update(
event_emitter,
f"Resolving {num_urls} source URLs...",
pipe_start_time,
)

source_metadatas_template: list["SourceMetadata"] = [
{"source": None, "original_url": None, "supports": []}
for _ in grounding_chunks
urls_to_resolve = [
uri
for _, uri in initial_metadatas
if uri.startswith(
"https://vertexaisearch.cloud.google.com/grounding-api-redirect/"
)
]
resolved_uris_map = {}

try:
urls_to_resolve = [url for _, url in initial_metadatas]
log.info(f"Resolving {len(urls_to_resolve)} source URLs...")
async with aiohttp.ClientSession() as session:
tasks = [self._resolve_url(session, url) for url in urls_to_resolve]
results = await asyncio.gather(*tasks)
log.info("URL resolution completed.")

resolved_uris = [res[0] for res in results]
resolved_uris_map = dict(zip(urls_to_resolve, resolved_uris))

# Emit final status based on resolution results.
success_count = sum(1 for _, success in results if success)
final_status_msg = (
"URL resolution complete"
if success_count == num_urls
else f"Resolved {success_count}/{num_urls} URLs"
)
if urls_to_resolve:
num_urls = len(urls_to_resolve)
self._emit_status_update(
event_emitter, final_status_msg, pipe_start_time, done=True
event_emitter,
f"Resolving {num_urls} source URLs...",
pipe_start_time,
)

except Exception as e:
log.error(f"Error during URL resolution: {e}")
resolved_uris_map = {url: url for _, url in initial_metadatas}
try:
log.info(f"Resolving {num_urls} source URLs...")
async with aiohttp.ClientSession() as session:
tasks = [self._resolve_url(session, url) for url in urls_to_resolve]
results = await asyncio.gather(*tasks)
log.info("URL resolution completed.")

resolved_uris = [res[0] for res in results]
resolved_uris_map = dict(zip(urls_to_resolve, resolved_uris))

success_count = sum(1 for _, success in results if success)
final_status_msg = (
"URL resolution complete"
if success_count == num_urls
else f"Resolved {success_count}/{num_urls} URLs"
)
self._emit_status_update(
event_emitter, final_status_msg, pipe_start_time, done=True
)

# Emit failure status.
self._emit_status_update(
event_emitter, "URL resolution failed", pipe_start_time, done=True
)
except Exception as e:
log.error(f"Error during URL resolution: {e}")
resolved_uris_map = {url: url for url in urls_to_resolve}
self._emit_status_update(
event_emitter, "URL resolution failed", pipe_start_time, done=True
)

source_metadatas_template: list["SourceMetadata"] = [
{"source": None, "original_url": None, "supports": []}
for _ in grounding_chunks
]
populated_metadatas = [m.copy() for m in source_metadatas_template]

for chunk_index, original_uri in initial_metadatas:
resolved_uri = resolved_uris_map.get(original_uri, original_uri)
final_uri = resolved_uris_map.get(original_uri, original_uri)
if 0 <= chunk_index < len(populated_metadatas):
populated_metadatas[chunk_index]["original_url"] = original_uri
populated_metadatas[chunk_index]["source"] = resolved_uri
populated_metadatas[chunk_index]["source"] = final_uri
else:
log.warning(
f"Chunk index {chunk_index} out of bounds when populating resolved URLs."
)

# Create a mapping from each chunk index to the text segments it supports.
chunk_index_to_segments: dict[int, list[types.Segment]] = {}
for support in supports:
segment = support.segment
indices = support.grounding_chunk_indices
if not (indices is not None and segment and segment.end_index is not None):
if not (segment and segment.text and indices is not None):
continue
for index in indices:
if 0 <= index < len(populated_metadatas):
populated_metadatas[index]["supports"].append(support.model_dump()) # type: ignore
else:
log.warning(
f"Invalid grounding chunk index {index} found in support during background processing."
)

valid_source_metadatas = [
m for m in populated_metadatas if m.get("original_url") is not None
]
for index in indices:
if index not in chunk_index_to_segments:
chunk_index_to_segments[index] = []
chunk_index_to_segments[index].append(segment)
populated_metadatas[index]["supports"].append(support.model_dump()) # type: ignore

valid_source_metadatas: list["SourceMetadata"] = []
doc_list: list[str] = []

for i, meta in enumerate(populated_metadatas):
if meta.get("original_url") is not None:
valid_source_metadatas.append(meta)

content_parts: list[str] = []
chunk = grounding_chunks[i]

if maps_info := chunk.maps:
title = maps_info.title or "N/A"
place_id = maps_info.place_id or "N/A"
content_parts.append(f"Title: {title}\nPlace ID: {place_id}")

supported_segments = chunk_index_to_segments.get(i)
if supported_segments:
if content_parts:
content_parts.append("") # Add a blank line for separation

# Use a set to show each unique snippet only once per source.
unique_snippets = {
(seg.text, seg.start_index, seg.end_index)
for seg in supported_segments
if seg.text is not None
}

# Sort snippets by their appearance in the text.
sorted_snippets = sorted(unique_snippets, key=lambda s: s[1] or 0)

snippet_strs = [
f'- "{text}" (Indices: {start}-{end})'
for text, start, end in sorted_snippets
]
content_parts.append("Supported text snippets:")
content_parts.extend(snippet_strs)

doc_list.append("\n".join(content_parts))

sources_list: list["Source"] = []
if valid_source_metadatas:
doc_list = [""] * len(valid_source_metadatas)
sources_list.append(
{
"source": {"name": "web_search"},
Expand All @@ -627,41 +673,43 @@ async def _resolve_and_emit_sources(
}
await event_emitter(event)
log.info("Emitted sources event.")
log.debug("ChatCompletionEvent:", payload=event)
log.trace("ChatCompletionEvent:", payload=event)

async def _emit_status_event_w_queries(
self,
grounding_metadata: types.GroundingMetadata,
event_emitter: Callable[["Event"], Awaitable[None]],
) -> None:
"""
Creates a StatusEvent with Google search URLs based on the web_search_queries
in the GenerateContentResponse.
Creates a StatusEvent with search URLs based on the web_search_queries
in the GroundingMetadata. This covers both Google Search and Google Maps grounding.
"""
if not grounding_metadata.web_search_queries:
log.warning("Grounding metadata does not contain any search queries.")
log.debug("Grounding metadata does not contain any search queries.")
return

search_queries = grounding_metadata.web_search_queries
if not search_queries:
log.debug("web_search_queries list is empty.")
return

# The queries are used for grounding, so we link them to a general Google search page.
google_search_urls = [
f"https://www.google.com/search?q={query}" for query in search_queries
]

status_event_data: StatusEventData = {
"action": "web_search",
"description": "This response was grounded with Google Search",
"description": "This response was grounded with a Google tool",
"urls": google_search_urls,
}
status_event: StatusEvent = {
"type": "status",
"data": status_event_data,
}
await event_emitter(status_event)
log.info("Emitted search queries.")
log.debug("StatusEvent:", payload=status_event)
log.info("Emitted grounding queries.")
log.trace("StatusEvent:", payload=status_event)

# endregion 1.1 Add citations

Expand Down
46 changes: 46 additions & 0 deletions plugins/filters/gemini_map_grounding_toggle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
title: Google Maps Grounding
id: gemini_maps_grounding_toggle
description: Ground the model's response in Google Maps data
author: suurt8ll
author_url: https://github.com/suurt8ll
funding_url: https://github.com/suurt8ll/open_webui_functions
license: MIT
version: 1.0.0
"""

from typing import TYPE_CHECKING, cast
from pydantic import BaseModel

# This block is skipped at runtime.
if TYPE_CHECKING:
# Imports custom type definitions (TypedDicts) for static analysis purposes (mypy/pylance).
from utils.manifold_types import *


class Filter:
class Valves(BaseModel):
pass

def __init__(self) -> None:
self.valves = self.Valves()
# Makes the filter toggleable in the front-end.
self.toggle = True
# Icon from https://icon-sets.iconify.design/line-md/map-marker/
self.icon = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIyNCIgaGVpZ2h0PSIyNCIgdmlld0JveD0iMCAwIDI0IDI0Ij48Y2lyY2xlIGN4PSIxMiIgY3k9IjkiIHI9IjIuNSIgZmlsbD0iY3VycmVudENvbG9yIi8+PHBhdGggZmlsbD0ibm9uZSIgc3Ryb2tlPSJjdXJyZW50Q29sb3IiIHN0cm9rZS1saW5lY2FwPSJyb3VuZCIgc3Ryb2tlLWxpbmVqb2luPSJyb3VuZCIgc3Ryb2tlLXdpZHRoPSIyIiBkPSJNMTIgMjAuNWMwIDAgLTYgLTcgLTYgLTExLjVjMCAtMy4zMSAyLjY5IC02IDYgLTZjMy4zMSAwIDYgMi42OSA2IDZjMCA0LjUgLTYgMTEuNSAtNiAxMS41WiIvPjwvc3ZnPg=="

async def inlet(
self,
body: "Body",
) -> "Body":
# Signal downstream Gemini Manifold pipe that Maps grounding is enabled.

# Ensure features field exists
metadata = body.get("metadata")
metadata_features = metadata.get("features")
if metadata_features is None:
metadata_features = cast(Features, {})
metadata["features"] = metadata_features

metadata_features["google_maps_grounding"] = True
return body
Loading