From 76e309ea3302b71c937c07a542c78145770624a7 Mon Sep 17 00:00:00 2001 From: Lei Lei Date: Thu, 25 Sep 2025 10:25:28 +0800 Subject: [PATCH 1/7] first draft of mcp client. --- apps/miroflow-agent/src/core/pipeline.py | 2 +- .../src/miroflow_tools/managerv2.py | 178 ++++++++++++++++++ 2 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 libs/miroflow-tools/src/miroflow_tools/managerv2.py diff --git a/apps/miroflow-agent/src/core/pipeline.py b/apps/miroflow-agent/src/core/pipeline.py index 09478334..9ae3d462 100644 --- a/apps/miroflow-agent/src/core/pipeline.py +++ b/apps/miroflow-agent/src/core/pipeline.py @@ -38,7 +38,7 @@ async def execute_task_pipeline( task_description: str, task_file_name: str, main_agent_tool_manager: ToolManager, - sub_agent_tool_managers: List[Dict[str, ToolManager]], + sub_agent_tool_managers: Dict[str, ToolManager], output_formatter: OutputFormatter, ground_truth: Optional[Any] = None, log_dir: str = "logs", diff --git a/libs/miroflow-tools/src/miroflow_tools/managerv2.py b/libs/miroflow-tools/src/miroflow_tools/managerv2.py new file mode 100644 index 00000000..782015ef --- /dev/null +++ b/libs/miroflow-tools/src/miroflow_tools/managerv2.py @@ -0,0 +1,178 @@ +from contextlib import AsyncExitStack, asynccontextmanager +from typing import Any, Literal, cast + +from mcp import ClientSession +from mcp.client.sse import sse_client +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.client.streamable_http import streamablehttp_client +from miroflow_tools.manager import ToolManagerProtocol +from pydantic import BaseModel, HttpUrl + + +class ConfigBase(BaseModel): + name: str + + +class StdIOConfig(ConfigBase): + kind: Literal["stdio"] + params: StdioServerParameters + + +class SSEConfig(ConfigBase): + kind: Literal["sse"] + url: HttpUrl + + +class StreamableHttpConfig(ConfigBase): + kind: Literal["streamable_http"] + url: HttpUrl + + +Config = StdIOConfig | SSEConfig | StreamableHttpConfig + + +@asynccontextmanager +async def connect_by_config(cfg: Config): + """ + returns a mcp.ClientSession instance, depending on Config. + """ + async with AsyncExitStack() as stack: + read, write = None, None + if cfg.kind == "stdio": + cfg = cast(StdIOConfig, cfg) + read, write = await stack.enter_async_context(stdio_client(cfg.params)) + elif cfg.kind == "sse": + cfg = cast(SSEConfig, cfg) + read, write = await stack.enter_async_context(sse_client(str(cfg.url))) + elif cfg.kind == "streamable_http": + cfg = cast(StreamableHttpConfig, cfg) + read, write, _ = await stack.enter_async_context( + streamablehttp_client(str(cfg.url)) + ) + else: # type: ignore + raise TypeError("unknown kind {} in cfg".format(cfg.kind)) + if read is not None and write is not None: + session = await stack.enter_async_context(ClientSession(read, write)) + await session.initialize() + yield session + + +class ToolManagerV2(ToolManagerProtocol): + """ + implements a barebone ToolManager. Difference in Version 2: + 1. deprecate huggingface block + browser session (tool name no longer matches). + 2. + """ + + def __init__(self, server_configs: list[Config], logger: Any): + """ + Initialize ToolManager. + :param server_configs: List returned by create_server_parameters() + """ + self.server_dict = {config.name: config for config in server_configs} + self.task_log = None + + def add_log(self, logger: Any): + self.task_log = logger + + def _log(self, level: str, step_name: str, message: str, metadata=None): + """Helper method to log using task_log if available, otherwise skip logging.""" + if self.task_log: + self.task_log.log_step(level, step_name, message, metadata) + + def info(self, step_name: str, message: str): + self._log("info", f"ToolManagerV2 | {step_name}", message) + + def error(self, step_name: str, message: str): + self._log("error", f"ToolManagerV2 | {step_name}", message) + + async def get_all_tool_definitions(self): + """ + Connect to all configured servers and get their tool definitions. + Returns a list suitable for passing to the Prompt generator. + """ + + async def inner_list_tools(session): + try: + response = await session.list_tools() + return response, None + except Exception as e: + return None, e + + final = [] + # Process remote server tools + for server_name, config in self.server_dict.items(): + curr = {"name": server_name, "tools": []} + self.info( + "Get Tool Definitions", + f"Getting tool definitions for server '{server_name}'...", + ) + async with connect_by_config(config) as session: + response, error = await inner_list_tools(session) + if error is not None: + self.error( + "Connection Error", + f"Unable to connect or get tools from server '{server_name}': {str(error)}", + ) + curr["tools"] = [{"error": f"Unable to fetch tools: {str(error)}"}] + if response is not None: + for tool in response.tools: + curr["tools"].append( + { + "name": tool.name, + "description": tool.description, + "schema": tool.inputSchema, + } + ) + final.append(curr) + + return final + + async def execute_tool_call( + self, *, server_name: str, tool_name: str, arguments: dict[str, Any] + ) -> Any: + """ + Execute a single tool call. + :param server_name: Server name + :param tool_name: Tool name + :param arguments: Tool arguments dictionary + :return: Dictionary containing result or error + """ + + def rv(*, error: str | None = None, result: str | None = None): + common = {"server_name": server_name, "tool_name": tool_name} + depends = {"error": error} if error is not None else {"result": result} + return common | depends + + async def inner_call_tool(session, tool_name, arguments): + try: + tool_result = await session.call_tool(tool_name, arguments=arguments) + return tool_result, None + except Exception as e: + return None, e + + config = self.server_dict.get(server_name, None) + if config is None: + self.error( + "Server Not Found", + f"Attempting to call server '{server_name}' not found", + ) + return rv(error=f"Server '{server_name}' not found.") + + self.info( + "Tool Call Start", + f"Connecting to server '{server_name}' to call tool '{tool_name}'", + ) + async with connect_by_config(config) as session: + tool_result, error = await inner_call_tool(session, tool_name, arguments) + if error is not None: + self.error( + "Tool Execution Error", + f"Tool execution error: {error}", + ) + return rv(error=f"Tool execution failed: {str(error)}") + if tool_result is not None: + result_content = ( + tool_result.content[-1].text if tool_result.content else "" + ) + return rv(result=result_content) From 290dd38e9224fba4ccb0f1368fa1c0449b55248b Mon Sep 17 00:00:00 2001 From: Lei Lei Date: Thu, 25 Sep 2025 10:39:08 +0800 Subject: [PATCH 2/7] version 2 of create_mcp_server_parameters(). --- apps/miroflow-agent/src/config/http.py | 23 ++ apps/miroflow-agent/src/config/settings_v2.py | 35 +++ apps/miroflow-agent/src/config/sse.py | 23 ++ apps/miroflow-agent/src/config/stdio.py | 235 ++++++++++++++++++ 4 files changed, 316 insertions(+) create mode 100644 apps/miroflow-agent/src/config/http.py create mode 100644 apps/miroflow-agent/src/config/settings_v2.py create mode 100644 apps/miroflow-agent/src/config/sse.py create mode 100644 apps/miroflow-agent/src/config/stdio.py diff --git a/apps/miroflow-agent/src/config/http.py b/apps/miroflow-agent/src/config/http.py new file mode 100644 index 00000000..1d20dd58 --- /dev/null +++ b/apps/miroflow-agent/src/config/http.py @@ -0,0 +1,23 @@ +from typing import Literal, TypedDict + + +class Config(TypedDict): + name: str + kind: Literal["streamable_http"] + url: str + + +def hydrate_mcp_client_with_streamable_http(tool_list: list[str]) -> list[Config]: + """ + assert all(tool.endswith("-http") for tool in tool_list) + """ + configs: list[Config] = [] + # for tool_name in tool_list: + # if tool_name == "tool-google-search-http": + # config = Config(name=tool_name, kind="streamable_http", url="whatever") + # else: + # print("not supported") + # continue + # configs.append(config) + + return configs diff --git a/apps/miroflow-agent/src/config/settings_v2.py b/apps/miroflow-agent/src/config/settings_v2.py new file mode 100644 index 00000000..c70678ea --- /dev/null +++ b/apps/miroflow-agent/src/config/settings_v2.py @@ -0,0 +1,35 @@ +import os + +from omegaconf import DictConfig + +from .http import hydrate_mcp_client_with_streamable_http +from .sse import hydrate_mcp_client_with_sse_transport +from .stdio import hydrate_mcp_client_with_stdio_transport + + +def create_mcp_server_parameters(cfg: DictConfig, agent_cfg: DictConfig): + os.environ["OPENAI_BASE_URL"] = ( + cfg.llm.get("openai_base_url") or "https://api.openai.com/v1" + ) + os.environ["ANTHROPIC_BASE_URL"] = ( + cfg.llm.get("anthropic_base_url") or "https://api.anthropic.com" + ) + + OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL") + ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL") + + tool_list = agent_cfg.get("tools", []) + stdio_configs = hydrate_mcp_client_with_stdio_transport( + tool_list, + anthropic_base_url=ANTHROPIC_BASE_URL, + openai_base_url=OPENAI_BASE_URL, + ) + sse_configs = hydrate_mcp_client_with_sse_transport(tool_list) + http_configs = hydrate_mcp_client_with_streamable_http(tool_list) + + configs = [*stdio_configs, *sse_configs, *http_configs] + + blacklist = set() + for item in agent_cfg.get("tool_blacklist", []): + blacklist.add((item[0], item[1])) + return configs, blacklist diff --git a/apps/miroflow-agent/src/config/sse.py b/apps/miroflow-agent/src/config/sse.py new file mode 100644 index 00000000..6a4e41e3 --- /dev/null +++ b/apps/miroflow-agent/src/config/sse.py @@ -0,0 +1,23 @@ +from typing import Literal, TypedDict + + +class Config(TypedDict): + name: str + kind: Literal["sse"] + url: str + + +def hydrate_mcp_client_with_sse_transport(tool_list: list[str]) -> list[Config]: + """ + assert all(tool.endswith("-sse") for tool in tool_list) + """ + configs: list[Config] = [] + # for tool_name in tool_list: + # if tool_name == "tool-google-search-sse": + # config = Config(name=tool_name, kind="sse", url="whatever") + # else: + # print("not supported") + # continue + # configs.append(config) + + return configs diff --git a/apps/miroflow-agent/src/config/stdio.py b/apps/miroflow-agent/src/config/stdio.py new file mode 100644 index 00000000..8cad0ca3 --- /dev/null +++ b/apps/miroflow-agent/src/config/stdio.py @@ -0,0 +1,235 @@ +import sys +from typing import Literal, TypedDict + +from mcp import StdioServerParameters + +from .settings import ( + ANTHROPIC_API_KEY, + E2B_API_KEY, + JINA_API_KEY, + JINA_BASE_URL, + OPENAI_API_KEY, + REASONING_API_KEY, + REASONING_BASE_URL, + REASONING_MODEL_NAME, + SERPER_API_KEY, + SERPER_BASE_URL, + TENCENTCLOUD_SECRET_ID, + TENCENTCLOUD_SECRET_KEY, + VISION_API_KEY, + VISION_BASE_URL, + VISION_MODEL_NAME, + WHISPER_API_KEY, + WHISPER_BASE_URL, + WHISPER_MODEL_NAME, +) + + +class Config(TypedDict): + name: str + kind: Literal["stdio"] + params: StdioServerParameters + + +def hydrate_mcp_client_with_stdio_transport( + tool_list: list[str], anthropic_base_url: str, openai_base_url: str +) -> list[Config]: + configs: list[Config] = [] + for tool_name in tool_list: + if tool_name == "tool-google-search": + configs.append( + { + "name": "tool-google-search", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.searching_google_mcp_server", + ], + env={ + "SERPER_API_KEY": SERPER_API_KEY, + "SERPER_BASE_URL": SERPER_BASE_URL, + "JINA_API_KEY": JINA_API_KEY, + "JINA_BASE_URL": JINA_BASE_URL, + }, + ), + } + ) + elif tool_name == "tool-sougou-search": + configs.append( + { + "name": "tool-sougou-search", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.searching_sougou_mcp_server", + ], + env={ + "TENCENTCLOUD_SECRET_ID": TENCENTCLOUD_SECRET_ID, + "TENCENTCLOUD_SECRET_KEY": TENCENTCLOUD_SECRET_KEY, + "JINA_API_KEY": JINA_API_KEY, + "JINA_BASE_URL": JINA_BASE_URL, + }, + ), + } + ) + + elif tool_name == "tool-python": + configs.append( + { + "name": "tool-python", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.python_mcp_server"], + env={"E2B_API_KEY": E2B_API_KEY}, + ), + } + ) + elif tool_name == "tool-code": + configs.append( + { + "name": "tool-code", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.python_mcp_server"], + env={"E2B_API_KEY": E2B_API_KEY}, + ), + } + ) + elif tool_name == "tool-vqa": + configs.append( + { + "name": "tool-vqa", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.vision_mcp_server"], + env={ + "ANTHROPIC_API_KEY": ANTHROPIC_API_KEY, + "ANTHROPIC_BASE_URL": anthropic_base_url, + }, + ), + } + ) + elif tool_name == "tool-vqa-os": + configs.append( + { + "name": "tool-vqa-os", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.vision_mcp_server_os"], + env={ + "VISION_API_KEY": VISION_API_KEY, + "VISION_BASE_URL": VISION_BASE_URL, + "VISION_MODEL_NAME": VISION_MODEL_NAME, + }, + ), + } + ) + + elif tool_name == "tool_transcribe": + configs.append( + { + "name": "tool-transcribe", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.audio_mcp_server"], + env={ + "OPENAI_API_KEY": OPENAI_API_KEY, + "OPENAI_BASE_URL": openai_base_url, + }, + ), + } + ) + + elif tool_name == "tool-transcribe-os": + configs.append( + { + "name": "tool-transcribe-os", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.audio_mcp_server_os"], + env={ + "WHISPER_BASE_URL": WHISPER_BASE_URL, + "WHISPER_API_KEY": WHISPER_API_KEY, + "WHISPER_MODEL_NAME": WHISPER_MODEL_NAME, + }, + ), + } + ) + + elif tool_name == "tool-reasoning": + configs.append( + { + "name": "tool-reasoning", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.reasoning_mcp_server", + ], + env={ + "ANTHROPIC_API_KEY": ANTHROPIC_API_KEY, + "ANTHROPIC_BASE_URL": anthropic_base_url, + }, + ), + } + ) + + elif tool_name == "tool-reasoning-os": + configs.append( + { + "name": "tool-reasoning-os", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.reasoning_mcp_server_os", + ], + env={ + "REASONING_API_KEY": REASONING_API_KEY, + "REASONING_BASE_URL": REASONING_BASE_URL, + "REASONING_MODEL_NAME": REASONING_MODEL_NAME, + }, + ), + } + ) + + # reader + elif tool_name == "tool-reader": + configs.append( + { + "name": "tool-reader", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "markitdown_mcp"], + ), + } + ) + + elif tool_name == "tool-reading": + configs.append( + { + "name": "tool-reading", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.reading_mcp_server"], + ), + } + ) + else: + print("not supported") + + return configs From cec9327e7b5082a0fc9294eb801e34f9cbb0169f Mon Sep 17 00:00:00 2001 From: Lei Lei Date: Thu, 25 Sep 2025 10:45:26 +0800 Subject: [PATCH 3/7] rename files. --- .../{managerv2.py => manager_v2.py} | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) rename libs/miroflow-tools/src/miroflow_tools/{managerv2.py => manager_v2.py} (96%) diff --git a/libs/miroflow-tools/src/miroflow_tools/managerv2.py b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py similarity index 96% rename from libs/miroflow-tools/src/miroflow_tools/managerv2.py rename to libs/miroflow-tools/src/miroflow_tools/manager_v2.py index 782015ef..36ef377d 100644 --- a/libs/miroflow-tools/src/miroflow_tools/managerv2.py +++ b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py @@ -1,5 +1,5 @@ from contextlib import AsyncExitStack, asynccontextmanager -from typing import Any, Literal, cast +from typing import Any, Literal, Protocol, cast from mcp import ClientSession from mcp.client.sse import sse_client @@ -57,20 +57,8 @@ async def connect_by_config(cfg: Config): yield session -class ToolManagerV2(ToolManagerProtocol): - """ - implements a barebone ToolManager. Difference in Version 2: - 1. deprecate huggingface block + browser session (tool name no longer matches). - 2. - """ - - def __init__(self, server_configs: list[Config], logger: Any): - """ - Initialize ToolManager. - :param server_configs: List returned by create_server_parameters() - """ - self.server_dict = {config.name: config for config in server_configs} - self.task_log = None +class LoggingMixin(Protocol): + task_log: Any def add_log(self, logger: Any): self.task_log = logger @@ -86,6 +74,21 @@ def info(self, step_name: str, message: str): def error(self, step_name: str, message: str): self._log("error", f"ToolManagerV2 | {step_name}", message) + +class ToolManagerV2(ToolManagerProtocol, LoggingMixin): + """ + implements a barebone ToolManager. Difference in Version 2: + 1. deprecate huggingface block + browser session (tool name no longer matches). + 2. + """ + + def __init__(self, server_configs: list[Config]): + """ + Initialize ToolManager. + :param server_configs: List returned by create_server_parameters() + """ + self.server_dict = {config.name: config for config in server_configs} + async def get_all_tool_definitions(self): """ Connect to all configured servers and get their tool definitions. From a785ca6f78ddb71e884d22581d8e847d147daba7 Mon Sep 17 00:00:00 2001 From: Lei Lei Date: Thu, 25 Sep 2025 11:10:56 +0800 Subject: [PATCH 4/7] add parsing logic. --- .../src/miroflow_tools/manager_v2.py | 102 ++++++++++++------ 1 file changed, 69 insertions(+), 33 deletions(-) diff --git a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py index 36ef377d..49aa1e6d 100644 --- a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py +++ b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py @@ -82,12 +82,24 @@ class ToolManagerV2(ToolManagerProtocol, LoggingMixin): 2. """ - def __init__(self, server_configs: list[Config]): + def __init__(self, server_configs: list[dict[str, Any]]): """ Initialize ToolManager. :param server_configs: List returned by create_server_parameters() """ - self.server_dict = {config.name: config for config in server_configs} + parsed_configs = [] + for config in server_configs: + kind = config.get("kind") + if kind == "stdio": + config = StdIOConfig(**config) + elif kind == "sse": + config = SSEConfig(**config) + elif kind == "streamable_http": + config = StreamableHttpConfig(**config) + else: + raise ValueError(f"unknown kind {kind} in config") + parsed_configs.append(config) + self.server_dict = {config.name: config for config in parsed_configs} async def get_all_tool_definitions(self): """ @@ -95,7 +107,8 @@ async def get_all_tool_definitions(self): Returns a list suitable for passing to the Prompt generator. """ - async def inner_list_tools(session): + async def inner_list_tools(session: ClientSession): + """helper function to reduce indentation level""" try: response = await session.list_tools() return response, None @@ -110,24 +123,35 @@ async def inner_list_tools(session): "Get Tool Definitions", f"Getting tool definitions for server '{server_name}'...", ) - async with connect_by_config(config) as session: - response, error = await inner_list_tools(session) - if error is not None: - self.error( - "Connection Error", - f"Unable to connect or get tools from server '{server_name}': {str(error)}", - ) - curr["tools"] = [{"error": f"Unable to fetch tools: {str(error)}"}] - if response is not None: - for tool in response.tools: - curr["tools"].append( - { - "name": tool.name, - "description": tool.description, - "schema": tool.inputSchema, - } + try: + async with connect_by_config(config) as session: + response, error = await inner_list_tools(session) + if error is not None: + self.error( + "Connection Error", + f"Unable to connect or get tools from server '{server_name}': {str(error)}", ) - final.append(curr) + curr["tools"] = [ + {"error": f"Unable to fetch tools: {str(error)}"} + ] + final.append(curr) + if response is not None: + for tool in response.tools: + curr["tools"].append( + { + "name": tool.name, + "description": tool.description, + "schema": tool.inputSchema, + } + ) + final.append(curr) + except Exception as e: + self.error( + "MCP session Error", + f"MCP session error: {str(e)}", + ) + curr["tools"] = [{"error": f"MCP session error: {str(e)}"}] + final.append(curr) return final @@ -147,7 +171,10 @@ def rv(*, error: str | None = None, result: str | None = None): depends = {"error": error} if error is not None else {"result": result} return common | depends - async def inner_call_tool(session, tool_name, arguments): + async def inner_call_tool( + session: ClientSession, tool_name: str, arguments: dict[str, Any] + ): + """helper function to reduce indentation level""" try: tool_result = await session.call_tool(tool_name, arguments=arguments) return tool_result, None @@ -166,16 +193,25 @@ async def inner_call_tool(session, tool_name, arguments): "Tool Call Start", f"Connecting to server '{server_name}' to call tool '{tool_name}'", ) - async with connect_by_config(config) as session: - tool_result, error = await inner_call_tool(session, tool_name, arguments) - if error is not None: - self.error( - "Tool Execution Error", - f"Tool execution error: {error}", - ) - return rv(error=f"Tool execution failed: {str(error)}") - if tool_result is not None: - result_content = ( - tool_result.content[-1].text if tool_result.content else "" + try: + async with connect_by_config(config) as session: + tool_result, error = await inner_call_tool( + session, tool_name, arguments ) - return rv(result=result_content) + if error is not None: + self.error( + "Tool Execution Error", + f"Tool execution error: {error}", + ) + return rv(error=f"Tool execution failed: {str(error)}") + if tool_result is not None: + result_content = ( + tool_result.content[-1].text if tool_result.content else "" + ) + return rv(result=result_content) + except Exception as e: + self.error( + "MCP Session Error", + f"MCP session error: {e}", + ) + return rv(error=f"MCP session error: {str(e)}") From 4e030df2c4ba53a3ce8d8e2a3ccfcc20ca0856ef Mon Sep 17 00:00:00 2001 From: Lei Lei Date: Thu, 25 Sep 2025 11:14:48 +0800 Subject: [PATCH 5/7] add parsing logic. --- libs/miroflow-tools/src/miroflow_tools/manager_v2.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py index 49aa1e6d..c5fdb57c 100644 --- a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py +++ b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py @@ -57,7 +57,11 @@ async def connect_by_config(cfg: Config): yield session -class LoggingMixin(Protocol): +class LoggingMixin: + """ + add logging instance (.task_log) and helper functions (info(), error()) to any class. + """ + task_log: Any def add_log(self, logger: Any): @@ -91,11 +95,11 @@ def __init__(self, server_configs: list[dict[str, Any]]): for config in server_configs: kind = config.get("kind") if kind == "stdio": - config = StdIOConfig(**config) + config = StdIOConfig.model_validate(config) elif kind == "sse": - config = SSEConfig(**config) + config = SSEConfig.model_validate(config) elif kind == "streamable_http": - config = StreamableHttpConfig(**config) + config = StreamableHttpConfig.model_validate(config) else: raise ValueError(f"unknown kind {kind} in config") parsed_configs.append(config) From cc8ef8ca93b4c0a57bff99c8c771c6f5d68577fc Mon Sep 17 00:00:00 2001 From: Lei Lei Date: Thu, 25 Sep 2025 11:28:04 +0800 Subject: [PATCH 6/7] move result parsing inside. --- .../src/miroflow_tools/manager_v2.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py index c5fdb57c..96108074 100644 --- a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py +++ b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py @@ -1,10 +1,11 @@ from contextlib import AsyncExitStack, asynccontextmanager -from typing import Any, Literal, Protocol, cast +from typing import Any, Literal, cast from mcp import ClientSession from mcp.client.sse import sse_client from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.client.streamable_http import streamablehttp_client +from mcp.types import TextContent from miroflow_tools.manager import ToolManagerProtocol from pydantic import BaseModel, HttpUrl @@ -62,7 +63,7 @@ class LoggingMixin: add logging instance (.task_log) and helper functions (info(), error()) to any class. """ - task_log: Any + task_log: Any = None def add_log(self, logger: Any): self.task_log = logger @@ -143,7 +144,7 @@ async def inner_list_tools(session: ClientSession): for tool in response.tools: curr["tools"].append( { - "name": tool.name, + "name": tool.name, # type: ignore "description": tool.description, "schema": tool.inputSchema, } @@ -181,7 +182,16 @@ async def inner_call_tool( """helper function to reduce indentation level""" try: tool_result = await session.call_tool(tool_name, arguments=arguments) - return tool_result, None + final = "" + if tool_result is not None: + if ( + getattr(tool_result, "content", None) is not None + and len(tool_result.content) > 0 + ): + block = tool_result.content[-1] + if isinstance(block, TextContent): + final = block.text + return final, None except Exception as e: return None, e @@ -199,7 +209,7 @@ async def inner_call_tool( ) try: async with connect_by_config(config) as session: - tool_result, error = await inner_call_tool( + result_content, error = await inner_call_tool( session, tool_name, arguments ) if error is not None: @@ -208,10 +218,7 @@ async def inner_call_tool( f"Tool execution error: {error}", ) return rv(error=f"Tool execution failed: {str(error)}") - if tool_result is not None: - result_content = ( - tool_result.content[-1].text if tool_result.content else "" - ) + if result_content is not None: return rv(result=result_content) except Exception as e: self.error( From ffc75af2875855a1731b91f48541c714946af9ca Mon Sep 17 00:00:00 2001 From: Lei Lei Date: Tue, 14 Oct 2025 11:07:32 +0800 Subject: [PATCH 7/7] update name and stuff. --- .../src/miroflow_tools/manager_v2.py | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py index 96108074..caf8ae2d 100644 --- a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py +++ b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py @@ -33,7 +33,7 @@ class StreamableHttpConfig(ConfigBase): @asynccontextmanager -async def connect_by_config(cfg: Config): +async def connect(cfg: Config): """ returns a mcp.ClientSession instance, depending on Config. """ @@ -83,8 +83,8 @@ def error(self, step_name: str, message: str): class ToolManagerV2(ToolManagerProtocol, LoggingMixin): """ implements a barebone ToolManager. Difference in Version 2: - 1. deprecate huggingface block + browser session (tool name no longer matches). - 2. + 1. Deprecate huggingface block + browser session (tool name no longer matches). + 2. add supports for streamable_http. """ def __init__(self, server_configs: list[dict[str, Any]]): @@ -122,24 +122,23 @@ async def inner_list_tools(session: ClientSession): final = [] # Process remote server tools - for server_name, config in self.server_dict.items(): - curr = {"name": server_name, "tools": []} + for name, config in self.server_dict.items(): self.info( "Get Tool Definitions", - f"Getting tool definitions for server '{server_name}'...", + f"Getting tool definitions for server '{name}'...", ) + curr = {"name": name, "tools": []} try: - async with connect_by_config(config) as session: + async with connect(config) as session: response, error = await inner_list_tools(session) if error is not None: self.error( - "Connection Error", - f"Unable to connect or get tools from server '{server_name}': {str(error)}", + "List Tools Error", + f"Unable to connect or get tools from server '{name}': {str(error)}", ) curr["tools"] = [ {"error": f"Unable to fetch tools: {str(error)}"} ] - final.append(curr) if response is not None: for tool in response.tools: curr["tools"].append( @@ -149,13 +148,10 @@ async def inner_list_tools(session: ClientSession): "schema": tool.inputSchema, } ) - final.append(curr) except Exception as e: - self.error( - "MCP session Error", - f"MCP session error: {str(e)}", - ) + self.error("MCP session Error", f"MCP session error: {str(e)}") curr["tools"] = [{"error": f"MCP session error: {str(e)}"}] + finally: final.append(curr) return final @@ -171,9 +167,9 @@ async def execute_tool_call( :return: Dictionary containing result or error """ - def rv(*, error: str | None = None, result: str | None = None): + def rv(*, exc: str | None = None, res: str | None = None): common = {"server_name": server_name, "tool_name": tool_name} - depends = {"error": error} if error is not None else {"result": result} + depends = {"error": exc} if exc is not None else {"result": res} return common | depends async def inner_call_tool( @@ -201,28 +197,26 @@ async def inner_call_tool( "Server Not Found", f"Attempting to call server '{server_name}' not found", ) - return rv(error=f"Server '{server_name}' not found.") + return rv(exc=f"Server '{server_name}' not found.") self.info( "Tool Call Start", f"Connecting to server '{server_name}' to call tool '{tool_name}'", ) try: - async with connect_by_config(config) as session: - result_content, error = await inner_call_tool( - session, tool_name, arguments - ) - if error is not None: + async with connect(config) as session: + res, exc = await inner_call_tool(session, tool_name, arguments) + if exc is not None: self.error( "Tool Execution Error", - f"Tool execution error: {error}", + f"Tool execution error: {exc}", ) - return rv(error=f"Tool execution failed: {str(error)}") - if result_content is not None: - return rv(result=result_content) + return rv(exc=f"Tool execution failed: {str(exc)}") + if res is not None: + return rv(res=res) except Exception as e: self.error( "MCP Session Error", f"MCP session error: {e}", ) - return rv(error=f"MCP session error: {str(e)}") + return rv(exc=f"MCP session error: {str(e)}")