Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 103 additions & 94 deletions bases/lif/semantic_search_mcp_server/core.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
import json
"""
LIF Semantic Search MCP Server.

This module provides an MCP (Model Context Protocol) server for semantic search
over LIF data fields. It dynamically loads the schema from MDR at startup
(falling back to file if MDR is unavailable).

Features:
- Schema loading from MDR with file fallback
- Dynamic filter and mutation model generation
- Semantic search over schema leaves
- Health and status endpoints
- Schema refresh capability
"""

import os
import sys
from typing import Annotated, List

from fastmcp import FastMCP
from pydantic import Field
from sentence_transformers import SentenceTransformer
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.responses import JSONResponse, PlainTextResponse

from lif.lif_schema_config import LIFSchemaConfig, DEFAULT_ATTRIBUTE_KEYS
from lif.lif_schema_config import DEFAULT_ATTRIBUTE_KEYS, LIFSchemaConfig
from lif.logging import get_logger
from lif.mdr_client import get_openapi_lif_data_model_from_file
from lif.openapi_schema_parser import load_schema_leaves
from lif.schema_state_manager import SchemaStateManager
from lif.semantic_search_service.core import (
build_embeddings,
run_semantic_search,
build_dynamic_filter_model,
build_dynamic_mutation_model,
run_mutation,
run_semantic_search,
)

logger = get_logger(__name__)
Expand All @@ -30,126 +38,127 @@
CONFIG = LIFSchemaConfig.from_environment()

# Extract values for convenience
ROOT_NODES = CONFIG.all_root_types
DEFAULT_ROOT_NODE = CONFIG.root_type_name
LIF_GRAPHQL_API_URL = os.getenv("LIF_GRAPHQL_API_URL", "http://localhost:8002/graphql")
TOP_K = CONFIG.semantic_search_top_k
MODEL_NAME = CONFIG.semantic_search_model_name

ATTRIBUTE_KEYS = DEFAULT_ATTRIBUTE_KEYS


# --------- LOAD VECTOR STORE & EMBEDDINGS ---------

# get the OpenAPI specification for the LIF data model from the MDR
openapi = get_openapi_lif_data_model_from_file()

# Load schema leaves for all root nodes
ALL_LEAVES: List = []
LEAVES_BY_ROOT: dict = {}

for root_node in ROOT_NODES:
try:
root_leaves = load_schema_leaves(openapi, root_node, attribute_keys=ATTRIBUTE_KEYS)
LEAVES_BY_ROOT[root_node] = root_leaves
ALL_LEAVES.extend(root_leaves)
logger.info(f"Loaded {len(root_leaves)} schema leaves for root '{root_node}'")
except Exception as e:
# Primary root (Person) is required; additional roots are optional
if root_node == DEFAULT_ROOT_NODE:
logger.critical(f"Failed to load schema leaves for required root '{root_node}': {e}")
sys.exit(1)
else:
logger.warning(f"Failed to load schema leaves for optional root '{root_node}': {e}")

logger.info(f"Total schema leaves loaded: {len(ALL_LEAVES)}")

# Build filter and mutation models for each root
FILTER_MODELS: dict = {}
MUTATION_MODELS: dict = {}

for root_node, root_leaves in LEAVES_BY_ROOT.items():
try:
filter_model = build_dynamic_filter_model(root_leaves)
if root_node in filter_model:
FILTER_MODELS[root_node] = filter_model[root_node]
logger.info(f"Dynamic Filter Schema for '{root_node}':\n" + json.dumps(filter_model[root_node].model_json_schema(), indent=2))
except Exception as e:
logger.warning(f"Failed to build dynamic filter model for '{root_node}': {e}")

try:
mutation_model = build_dynamic_mutation_model(root_leaves)
if root_node in mutation_model:
MUTATION_MODELS[root_node] = mutation_model[root_node]
logger.info(f"Dynamic Mutation Schema for '{root_node}':\n" + json.dumps(mutation_model[root_node].model_json_schema(), indent=2))
except Exception as e:
logger.warning(f"Failed to build dynamic mutation model for '{root_node}': {e}")

# Use default root for backwards compatibility
Filter = FILTER_MODELS.get(DEFAULT_ROOT_NODE)
MutationModel = MUTATION_MODELS.get(DEFAULT_ROOT_NODE)
# --------- SCHEMA STATE MANAGER ---------

if not Filter:
logger.critical(f"Failed to build filter model for default root '{DEFAULT_ROOT_NODE}'")
sys.exit(1)
# Initialize the state manager synchronously at module load time
# This is required because FastMCP tool decorators need the Pydantic models
# to be available at decoration time
_state_manager = SchemaStateManager(CONFIG, attribute_keys=ATTRIBUTE_KEYS)
_state_manager.initialize_sync(force_file=CONFIG.use_openapi_from_file)

try:
MODEL = SentenceTransformer(MODEL_NAME)
except Exception as e:
logger.critical(f"Failed to load SentenceTransformer model: {e}")
sys.exit(1)
# Get the state for use in tool definitions
_state = _state_manager.state

# ------ ALWAYS embed only the descriptions for ALL leaves (all root entities) ------
EMBEDDING_TEXTS = [leaf.description for leaf in ALL_LEAVES]
logger.info(
f"Schema loaded from {_state.source}. "
f"Total leaves: {len(_state.leaves)}, "
f"Filter models: {list(_state.filter_models.keys())}"
)

# Get filter and mutation models for the default root type
Filter = _state.filter_models.get(DEFAULT_ROOT_NODE)
MutationModel = _state.mutation_models.get(DEFAULT_ROOT_NODE)

try:
EMBEDDINGS = build_embeddings(EMBEDDING_TEXTS, MODEL)
except Exception as e:
logger.critical(f"EMBEDDINGS failed: {e}")
sys.exit(1)
if not Filter:
raise RuntimeError(f"Filter model not available for root '{DEFAULT_ROOT_NODE}'")

# Keep a reference to all leaves for search
LEAVES = ALL_LEAVES

# --------- MCP SERVER SETUP ---------

mcp = FastMCP(name="LIF-Query-Server")


# --------- HTTP ENDPOINTS ---------


@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> PlainTextResponse:
"""Health check endpoint for readiness probes."""
if not _state_manager.is_initialized:
return PlainTextResponse("NOT_READY", status_code=503)
return PlainTextResponse("OK")


@mcp.custom_route("/schema/status", methods=["GET"])
async def schema_status(request: Request) -> JSONResponse:
"""Get current schema status and metadata."""
status = _state_manager.get_status()
return JSONResponse(status)


@mcp.custom_route("/schema/refresh", methods=["POST"])
async def schema_refresh(request: Request) -> JSONResponse:
"""
Trigger a schema refresh from MDR.

Note: This updates the internal state but does NOT update the MCP tool
definitions, as those are set at module load time. To use new schema
definitions for tools, the server must be restarted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a phased approach?

Seems less than ideal to require a restart just so lif_query() or lif_mutation() would pick up the refreshed schema.

"""
result = await _state_manager.refresh()
status_code = 200 if result.get("success") else 500

# Add a note about tool definitions
if result.get("success"):
result["note"] = (
"Schema state updated. Note: MCP tool definitions use the schema "
"from server startup. Restart the server to use new schema in tools."
)

return JSONResponse(result, status_code=status_code)


# --------- MCP TOOLS ---------


@mcp.tool(
name="lif_query", description="Use this tool to run a LIF data query", annotations={"title": "Execute LIF Query"}
name="lif_query",
description="Use this tool to run a LIF data query",
annotations={"title": "Execute LIF Query"},
)
async def lif_query(
filter: Annotated[Filter, Field(description="Parameters for LIF query")],
query: Annotated[str, Field(description="Natural language query used to determine LIF data to retrieve")],
) -> List[dict]:
"""Perform a semantic search and retrieve LIF data."""
# Get current state (may have been refreshed)
state = _state_manager.state
return await run_semantic_search(
filter=filter,
query=query,
model=MODEL,
embeddings=EMBEDDINGS,
leaves=LEAVES,
model=state.model,
embeddings=state.embeddings,
leaves=state.leaves,
top_k=TOP_K,
graphql_url=LIF_GRAPHQL_API_URL,
config=CONFIG,
)


@mcp.tool(
name="lif_mutation",
description="Use this tool to mutate/update LIF data fields. You must specify a filter to select records.",
annotations={"title": "Execute LIF Mutation"},
)
async def lif_mutation(
filter: Annotated[Filter, Field(description="Filter to select records for mutation")],
input: Annotated[MutationModel, Field(description="Fields to update")],
) -> dict:
"""Run a mutation on LIF data fields."""
return await run_mutation(filter=filter, input=input, graphql_url=LIF_GRAPHQL_API_URL)
# Only register mutation tool if mutation model is available
if MutationModel:

@mcp.tool(
name="lif_mutation",
description="Use this tool to mutate/update LIF data fields. You must specify a filter to select records.",
annotations={"title": "Execute LIF Mutation"},
)
async def lif_mutation(
filter: Annotated[Filter, Field(description="Filter to select records for mutation")],
input: Annotated[MutationModel, Field(description="Fields to update")],
) -> dict:
"""Run a mutation on LIF data fields."""
return await run_mutation(filter=filter, input=input, graphql_url=LIF_GRAPHQL_API_URL)

else:
logger.warning(f"Mutation model not available for root '{DEFAULT_ROOT_NODE}', lif_mutation tool not registered")


http_app = mcp.http_app()
10 changes: 9 additions & 1 deletion components/lif/lif_schema_config/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class LIFSchemaConfig:
# MDR Configuration
mdr_api_url: str = "http://localhost:8012"
mdr_api_auth_token: str = "no_auth_token_set"
mdr_timeout_seconds: int = 30
openapi_data_model_id: Optional[str] = None
openapi_json_filename: str = "openapi_constrained_with_interactions.json"
use_openapi_from_file: bool = False
Expand Down Expand Up @@ -129,6 +130,8 @@ def validate(self) -> None:
# Validate timeouts are positive
if self.query_timeout_seconds <= 0:
errors.append(f"query_timeout_seconds must be positive, got {self.query_timeout_seconds}")
if self.mdr_timeout_seconds <= 0:
errors.append(f"mdr_timeout_seconds must be positive, got {self.mdr_timeout_seconds}")
if self.semantic_search_timeout <= 0:
errors.append(f"semantic_search_timeout must be positive, got {self.semantic_search_timeout}")
if self.semantic_search_top_k <= 0:
Expand All @@ -155,6 +158,7 @@ def from_environment(cls) -> "LIFSchemaConfig":
LIF_QUERY_TIMEOUT_SECONDS: Query timeout in seconds
LIF_MDR_API_URL: MDR API URL
LIF_MDR_API_AUTH_TOKEN: MDR authentication token
MDR_TIMEOUT_SECONDS: Timeout for MDR API calls (default: 30)
OPENAPI_DATA_MODEL_ID: MDR data model ID
OPENAPI_JSON_FILENAME: Local OpenAPI filename
USE_OPENAPI_DATA_MODEL_FROM_FILE: Use local file instead of MDR
Expand Down Expand Up @@ -192,6 +196,7 @@ def from_environment(cls) -> "LIFSchemaConfig":
# MDR
mdr_api_url=os.getenv("LIF_MDR_API_URL", "http://localhost:8012"),
mdr_api_auth_token=os.getenv("LIF_MDR_API_AUTH_TOKEN", "no_auth_token_set"),
mdr_timeout_seconds=int(os.getenv("MDR_TIMEOUT_SECONDS", "30")),
openapi_data_model_id=os.getenv("OPENAPI_DATA_MODEL_ID"),
openapi_json_filename=os.getenv(
"OPENAPI_JSON_FILENAME",
Expand All @@ -217,7 +222,10 @@ def from_environment(cls) -> "LIFSchemaConfig":
@property
def graphql_query_name(self) -> str:
"""GraphQL query field name (e.g., 'person' for root 'Person')."""
return to_graphql_query_name(self.root_type_name)
# root_type_name is validated to be non-empty, so result will never be None
result = to_graphql_query_name(self.root_type_name)
assert result is not None # Validated in __post_init__
return result

@property
def mutation_name(self) -> str:
Expand Down
24 changes: 22 additions & 2 deletions components/lif/mdr_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
from lif.mdr_client.core import (
get_openapi_lif_data_model,
# Config-based functions (preferred)
load_openapi_schema,
fetch_schema_from_mdr,
# File loading
get_openapi_lif_data_model_from_file,
# Legacy env-var based functions
get_openapi_lif_data_model,
get_openapi_lif_data_model_sync,
get_data_model_schema,
get_data_model_schema_sync,
get_data_model_transformation,
# Exceptions
MDRClientException,
MDRConfigurationError,
)

__all__ = [
"get_openapi_lif_data_model",
# Config-based functions (preferred)
"load_openapi_schema",
"fetch_schema_from_mdr",
# File loading
"get_openapi_lif_data_model_from_file",
# Legacy env-var based functions
"get_openapi_lif_data_model",
"get_openapi_lif_data_model_sync",
"get_data_model_schema",
"get_data_model_schema_sync",
"get_data_model_transformation",
# Exceptions
"MDRClientException",
"MDRConfigurationError",
]
Loading