From 010896e4cb82818f6f86f13357a1d07cc6d754c6 Mon Sep 17 00:00:00 2001 From: cristy Date: Sun, 16 Nov 2025 18:41:56 +0100 Subject: [PATCH 1/4] adds openapi automation example --- examples/openapi_automation/__init__.py | 0 examples/openapi_automation/models.py | 138 ++++++++ .../openapi_automation/openapi_automation.plx | 128 ++++++++ examples/openapi_automation/pipe_runner.py | 41 +++ examples/openapi_automation/pipefuncs.py | 307 ++++++++++++++++++ 5 files changed, 614 insertions(+) create mode 100644 examples/openapi_automation/__init__.py create mode 100644 examples/openapi_automation/models.py create mode 100644 examples/openapi_automation/openapi_automation.plx create mode 100644 examples/openapi_automation/pipe_runner.py create mode 100644 examples/openapi_automation/pipefuncs.py diff --git a/examples/openapi_automation/__init__.py b/examples/openapi_automation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/openapi_automation/models.py b/examples/openapi_automation/models.py new file mode 100644 index 0000000..11fc6a2 --- /dev/null +++ b/examples/openapi_automation/models.py @@ -0,0 +1,138 @@ +from typing import Any, Dict, List, Optional +from pipelex.core.stuffs.structured_content import StructuredContent +from pydantic import BaseModel, Field + +class FunctionParameter(StructuredContent): + name: str = Field(description="parameter name") + value: str = Field(description="parameter value") + type: str = Field(description="parameter type") + + +# OpenAPI Specification Models +class OpenAPIParameter(BaseModel): + name: str + in_: str = Field(alias="in") + required: Optional[bool] = False + description: Optional[str] = None + schema_: Optional[Dict[str, Any]] = Field(default=None, alias="schema") + + +class OpenAPIRequestBody(BaseModel): + description: Optional[str] = None + required: Optional[bool] = False + content: Optional[Dict[str, Any]] = None + + +class OpenAPIResponse(BaseModel): + description: Optional[str] = None + content: Optional[Dict[str, Any]] = None + + +class OpenAPIOperation(BaseModel): + operationId: Optional[str] = None + summary: Optional[str] = None + description: Optional[str] = None + parameters: Optional[List[OpenAPIParameter]] = None + requestBody: Optional[OpenAPIRequestBody] = None + responses: Optional[Dict[str, OpenAPIResponse]] = None + tags: Optional[List[str]] = None + + +class OpenAPIPathItem(BaseModel): + get: Optional[OpenAPIOperation] = None + post: Optional[OpenAPIOperation] = None + put: Optional[OpenAPIOperation] = None + delete: Optional[OpenAPIOperation] = None + patch: Optional[OpenAPIOperation] = None + options: Optional[OpenAPIOperation] = None + head: Optional[OpenAPIOperation] = None + + +class OpenAPIInfo(BaseModel): + title: str + version: str + description: Optional[str] = None + + +class OpenAPISpec(StructuredContent): + openapi: str = Field(description="OpenAPI version") + info: OpenAPIInfo = Field(description="API metadata") + paths: Dict[str, OpenAPIPathItem] = Field(description="API endpoints") + components: Optional[Dict[str, Any]] = Field( + default=None, description="Reusable components" + ) + servers: Optional[List[Dict[str, Any]]] = Field( + default=None, description="API servers" + ) + + +class FunctionInfo(StructuredContent): + function_name: str = Field(description="The operation ID / function name") + description: Optional[str] = Field(default=None, description="Function description") + + +class FunctionChoice(StructuredContent): + explanation: str = Field(description="Explanation of the choice.") + function_name: str = Field(description="Name of the function.") + + +class ParameterDetail(StructuredContent): + """Detailed parameter information for API calls""" + + name: str = Field(description="Parameter name") + param_in: str = Field( + description="Where the parameter goes: path, query, header, cookie" + ) + required: bool = Field( + default=False, description="Whether the parameter is required" + ) + param_type: Optional[str] = Field(default=None, description="Parameter data type") + description: Optional[str] = Field( + default=None, description="Parameter description" + ) + default: Optional[Any] = Field(default=None, description="Default value if any") + + +class FunctionDetails(StructuredContent): + """Complete details needed to make an API request""" + + function_name: str = Field(description="The operation ID / function name") + http_method: str = Field(description="HTTP method (GET, POST, PUT, DELETE, etc.)") + path: str = Field(description="API endpoint path") + description: Optional[str] = Field( + default=None, description="Operation description" + ) + parameters: List[ParameterDetail] = Field( + default_factory=list, description="List of parameters" + ) + request_body_required: bool = Field( + default=False, description="Whether a request body is required" + ) + request_body_schema: Optional[Dict[str, Any]] = Field( + default=None, description="Request body schema if applicable" + ) + tags: Optional[List[str]] = Field(default=None, description="Operation tags") + + +class RequestDetails(StructuredContent): + """Holds the actual parameter values needed to make an API request""" + + function_name: str = Field(description="The operation ID / function name") + http_method: str = Field(description="HTTP method (GET, POST, PUT, DELETE, etc.)") + path: str = Field(description="API endpoint path") + query_parameters: Optional[Dict[str, Any]] = Field( + default=None, description="Query parameters and their values" + ) + path_parameters: Optional[Dict[str, Any]] = Field( + default=None, description="Path parameters and their values" + ) + header_parameters: Optional[Dict[str, Any]] = Field( + default=None, description="Header parameters and their values" + ) + cookie_parameters: Optional[Dict[str, Any]] = Field( + default=None, description="Cookie parameters and their values" + ) + request_body: Optional[Dict[str, Any]] = Field( + default=None, description="Request body data if applicable" + ) + diff --git a/examples/openapi_automation/openapi_automation.plx b/examples/openapi_automation/openapi_automation.plx new file mode 100644 index 0000000..d530402 --- /dev/null +++ b/examples/openapi_automation/openapi_automation.plx @@ -0,0 +1,128 @@ +domain = "openapi_automation" +description = "Building function info from OpenAPI JSON spec" +main_pipe = "build_function_info" + +[concept] +OpenAPISpec = "Structured OpenAPI specification for the backend." +FunctionInfo = "Information about a function in the OpenAPI spec." +FunctionChoice = "Choice of OpenAPI function to accomplish an operation." +RequestDetails = "Request Details containing actual values for the request" + +[concept.OpenAPIURL] +description = "The URL of the OpenAPI JSON spec" +refines = "Text" + +[concept.OperationToAccomplish] +description = "The specific operation to accomplish." +refines = "Text" + +[concept.RelevantOpenapiPaths] +description = """ +Relevant information (e.g., paths, methods) from the OpenAPI JSON specification that pertains to the operation to accomplish. +""" + +[concept.RelevantOpenapiPaths.structure] +paths = { type = "text", description = "List of relevant paths.", required = true } +methods = { type = "text", description = "List of relevant methods.", required = true } + +[concept.FunctionName] +description = "The name of the function." +refines = "Text" + +[concept.FunctionParameter] +description = "The necessary function parameters." + +[concept.FunctionParameter.structure] +name = { type = "text", description = "Name of a function parameter.", required = true } +type = { type = "text", description = "Data type of a function parameter.", required = true } +value = { type = "text", description = "Values of each function parameter.", required = true } + +[concept.ApiResponseResult] +description = "The compiled api response content." + +[concept.ApiResponseResult.structure] +response = { type = "text", description = "The response of the api server", required = true } + + +[pipe.build_function_info] +type = "PipeSequence" +description = """ +Main pipeline that builds the function name and function parameters and values necessary for the task based on the OpenAPI JSON spec and the operation to accomplish. +""" +inputs = { openapi_url = "OpenAPIURL", operation_to_accomplish = "OperationToAccomplish" } +output = "ApiResponseResult" +#output = "FunctionDetails" +steps = [ + { pipe = "obtain_api_spec", result = "openapi_spec"}, + { pipe = "extract_available_functions", result = "function_info" }, + { pipe = "choose_function", result = "function_choice" }, + { pipe = "get_function_details", result = "function_details" }, + { pipe = "prepare_request", result = "request_details"}, + { pipe = "execute_api_call", result = "result_api_call" }, +] + +[pipe.obtain_api_spec] +type = "PipeFunc" +description = "Obtains the OpenAPI spec given a URL." +inputs = { openapi_url = "OpenAPIURL" } +output = "OpenAPISpec" +function_name = "obtain_openapi_model" + +[pipe.extract_available_functions] +type = "PipeFunc" +description = "Extracts the available functions from the OpenAPI spec." +inputs = { openapi_url = "OpenAPIURL" } +output = "FunctionInfo[]" +function_name = "extract_available_functions" + + +[pipe.choose_function] +type = "PipeLLM" +description = "Uses the operation to accomplish and relevant OpenAPI paths to determine the function name." +inputs = { operation_to_accomplish = "OperationToAccomplish", function_info = "FunctionInfo[]" } +output = "FunctionChoice" +model = "llm_to_engineer" +system_prompt = """ +Determine a function name based on the operation to accomplish and relevant OpenAPI paths. Be concise. +""" +prompt = """ +Based on the operation to accomplish and the available OpenAPI functions, choose the relevant function name. + +@operation_to_accomplish + +@function_info +""" + +[pipe.get_function_details] +type = "PipeFunc" +description = "Gets the details of a function from the OpenAPI spec." +inputs = { function_choice = "FunctionChoice" } +output = "FunctionDetails" +function_name = "get_function_details" + + + +[pipe.prepare_request] +type = "PipeLLM" +description = "Prepares the request body corresponding to the actual request" +inputs = { operation_to_accomplish = "OperationToAccomplish", function_details = "FunctionDetails" } +output = "RequestDetails" +model = "llm_to_engineer" +prompt = """ +Based on the operation to accomplish and the available OpenAPI functions, fill in the actual values for the current request. + +@operation_to_accomplish + +@function_details + +""" + + +[pipe.execute_api_call] +type = "PipeFunc" +description = "Execute the API request given a CompiledFunctionInfo." +inputs = { request_details = "RequestDetails" } +output = "ApiResponseResult" +function_name = "invoke_function_api_backend" + + diff --git a/examples/openapi_automation/pipe_runner.py b/examples/openapi_automation/pipe_runner.py new file mode 100644 index 0000000..ae42140 --- /dev/null +++ b/examples/openapi_automation/pipe_runner.py @@ -0,0 +1,41 @@ +import asyncio + +from pipelex.pipelex import Pipelex +from pipelex.pipeline.execute import execute_pipeline + +spec_url_1 = """ +https://developer.keap.com/docs/rest/2025-11-05-v1.json +""" + + +spec_url_2 = """ +https://petstore3.swagger.io/api/v3/openapi.json +""" + +USE_CASE_1 = (spec_url_1, "extract all the contacts with email pincopalla@gmail.com") +USE_CASE_2 = (spec_url_2, "get me the user with username: johndoe") + + +async def run_build_function_info(use_case: tuple[str, str]): + spec_url, operation_to_accomplish = use_case + return await execute_pipeline( + pipe_code="build_function_info", + inputs={ + "openapi_url": { + "concept": "openapi_function_builder.OpenAPIURL", + "content": spec_url, + }, + "operation_to_accomplish": { + "concept": "openapi_function_builder.OperationToAccomplish", + "content": operation_to_accomplish, + }, + }, + ) + + +if __name__ == "__main__": + # Initialize Pipelex + Pipelex.make() + + # Run the pipeline + result = asyncio.run(run_build_function_info(use_case=USE_CASE_2)) diff --git a/examples/openapi_automation/pipefuncs.py b/examples/openapi_automation/pipefuncs.py new file mode 100644 index 0000000..c774b4c --- /dev/null +++ b/examples/openapi_automation/pipefuncs.py @@ -0,0 +1,307 @@ +import json +from urllib.parse import urlparse + +import requests +from openapiclient import OpenAPIClient +from pipelex.core.memory.working_memory import WorkingMemory +from pipelex.core.stuffs.list_content import ListContent +from pipelex.core.stuffs.text_content import TextContent +from pipelex.system.registries.func_registry import pipe_func + +from examples.openapi_automation.models import * + + +@pipe_func() +async def invoke_function_api_backend(working_memory: WorkingMemory) -> TextContent: + """ + Execute an API request using the RequestDetails struct. + Builds and performs the actual HTTP request using the requests library. + """ + # Get the base URL from the OpenAPI spec + openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() + request_details = working_memory.get_stuff_as("request_details", RequestDetails) + + # Get the base URL from the OpenAPI spec + response = requests.get(url=openapi_url) + spec_data = response.json() + + # Extract base URL from servers + base_url = None + if "servers" in spec_data and len(spec_data["servers"]) > 0: + base_url = spec_data["servers"][0].get("url", "") + + if not base_url or base_url == "": + raise ValueError("No server URL found in OpenAPI specification") + + # Validate that base_url is a proper URL with protocol and host + parsed_url = urlparse(base_url) + if not parsed_url.scheme: + raise ValueError(f"Base URL missing protocol (http/https): {base_url}") + if not parsed_url.netloc: + raise ValueError(f"Base URL missing host: {base_url}") + if parsed_url.scheme not in ["http", "https"]: + raise ValueError(f"Base URL protocol must be http or https, got: {parsed_url.scheme}") + + print(f"Base URL: {base_url}") + print(f" Protocol: {parsed_url.scheme}") + print(f" Host: {parsed_url.hostname}") + print(f" Port: {parsed_url.port if parsed_url.port else 'default'}") + # Build the full URL with path parameters + url_path = request_details.path + if request_details.path_parameters: + for param_name, param_value in request_details.path_parameters.items(): + url_path = url_path.replace(f"{{{param_name}}}", str(param_value)) + + full_url = f"{base_url.rstrip('/')}/{url_path.lstrip('/')}" + + # Prepare request components + headers = {} + if request_details.header_parameters: + headers.update(request_details.header_parameters) + + cookies = {} + if request_details.cookie_parameters: + cookies.update(request_details.cookie_parameters) + + params = {} + if request_details.query_parameters: + params.update(request_details.query_parameters) + + # Prepare request body + json_body = None + if request_details.request_body: + json_body = request_details.request_body + + # Execute the HTTP request + print(f"Executing {request_details.http_method} request to {full_url}") + print(f"Query params: {params}") + print(f"Headers: {headers}") + print(f"Body: {json_body}") + + try: + http_response = requests.request( + method=request_details.http_method, + url=full_url, + params=params if params else None, + headers=headers if headers else None, + cookies=cookies if cookies else None, + json=json_body if json_body else None, + ) + + # Raise an exception for HTTP errors + http_response.raise_for_status() + + # Try to parse JSON response, fallback to text + try: + result = http_response.json() + return TextContent(text=json.dumps(result, indent=2)) + except json.JSONDecodeError: + return TextContent(text=http_response.text) + + except requests.exceptions.RequestException as e: + error_msg = f"Request failed: {str(e)}" + if hasattr(e, 'response') and e.response is not None: + error_msg += f"\nStatus code: {e.response.status_code}" + error_msg += f"\nResponse: {e.response.text}" + print(error_msg) + return TextContent(text=error_msg) + + +@pipe_func() +async def obtain_openapi_spec(working_memory: WorkingMemory) -> TextContent: + openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() + response = requests.get(url=openapi_url) + spec_data = response.json() + + api = OpenAPIClient(definition=openapi_url) + + # Use the async client with context manager + async with api.AsyncClient() as client: + # Build detailed function signatures from OpenAPI spec + functions_detail = [] + + # Parse the OpenAPI spec to extract function signatures + if "paths" in spec_data: + for path, methods in spec_data["paths"].items(): + for method, operation in methods.items(): + if method.lower() not in [ + "get", + "post", + "put", + "delete", + "patch", + "options", + "head", + ]: + continue + + operation_id = operation.get("operationId") + if not operation_id: + continue + + params = [] + + # Extract parameters + if "parameters" in operation: + for param in operation["parameters"]: + param_name = param.get("name", "unknown") + param_required = param.get("required", False) + + if param_required: + params.append(f"{param_name}") + else: + params.append(f"{param_name}=None") + + # Check for request body + if "requestBody" in operation: + params.append("body=" + json.dumps(operation["requestBody"])) + + params_str = ", ".join(params) if params else "" + functions_detail.append(f"{operation_id}({params_str})") + + # Fallback: just list function names + if not functions_detail: + for func_name in client.functions.keys(): + functions_detail.append(f"{func_name}(**kwargs)") + + functions_text = "\n".join(functions_detail) + spec = f"\n\nAvailable functions:\n{functions_text}" + + return TextContent(text=spec) + + +@pipe_func() +async def obtain_openapi_model(working_memory: WorkingMemory) -> OpenAPISpec: + """ + Fetch and parse OpenAPI specification into a structured Pydantic model. + + Returns: + OpenAPISpec: Structured representation of the OpenAPI specification + """ + openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() + response = requests.get(url=openapi_url) + spec_data = response.json() + + # Parse the raw JSON into our Pydantic model + # The model will validate and structure the data + openapi_spec = OpenAPISpec(**spec_data) + + return openapi_spec + + +@pipe_func() +async def extract_available_functions( + working_memory: WorkingMemory, +) -> ListContent[FunctionInfo]: + """ + Extract available functions from OpenAPI spec as a list of FunctionInfo objects. + + Returns: + ListContent: List of FunctionInfo objects with function_name and description + """ + openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() + response = requests.get(url=openapi_url) + spec_data = response.json() + + functions: List[FunctionInfo] = [] + + # Parse the OpenAPI spec to extract function names and descriptions + if "paths" in spec_data: + for path, methods in spec_data["paths"].items(): + for method, operation in methods.items(): + if method.lower() not in [ + "get", + "post", + "put", + "delete", + "patch", + "options", + "head", + ]: + continue + + operation_id = operation.get("operationId") + if not operation_id: + continue + + # Get description from summary or description field + description = operation.get("summary") or operation.get("description") + + functions.append( + FunctionInfo(function_name=operation_id, description=description) + ) + + # Convert to ListContent with FunctionInfo items + return ListContent(items=functions) + + +@pipe_func() +async def get_function_details(working_memory: WorkingMemory) -> FunctionDetails: + """ + Get detailed information about a specific function from the OpenAPI spec. + This includes HTTP method, path, parameters, and request body schema. + + Returns: + FunctionDetails: Complete details needed to make the API request + """ + # Get the structured OpenAPISpec from working memory + openapi_spec = working_memory.get_stuff_as("openapi_spec", OpenAPISpec) + function_choice = working_memory.get_stuff_as("function_choice", FunctionChoice) + function_name = function_choice.function_name + + # Search for the function in the OpenAPI spec + for path, path_item in openapi_spec.paths.items(): + # Check each HTTP method + for method_name in ["get", "post", "put", "delete", "patch", "options", "head"]: + operation = getattr(path_item, method_name, None) + + if operation and operation.operationId == function_name: + # Found the function! Extract all details + parameters: List[ParameterDetail] = [] + + # Extract parameters from the structured model + if operation.parameters: + for param in operation.parameters: + param_type = None + param_default = None + if param.schema_: + param_type = param.schema_.get("type") + param_default = param.schema_.get("default") + + parameters.append( + ParameterDetail( + name=param.name, + param_in=param.in_, + required=param.required or False, + param_type=param_type, + description=param.description, + default=param_default, + ) + ) + + # Extract request body information + request_body_required = False + request_body_schema = None + if operation.requestBody: + request_body_required = operation.requestBody.required or False + request_body_schema = operation.requestBody.content + + # Get description (prefer summary over description) + description = operation.summary or operation.description + + # Get tags + tags = operation.tags + + return FunctionDetails( + function_name=function_name, + http_method=method_name.upper(), + path=path, + description=description, + parameters=parameters, + request_body_required=request_body_required, + request_body_schema=request_body_schema, + tags=tags, + ) + + # Function not found + raise ValueError(f"Function '{function_name}' not found in OpenAPI specification") From 0ecd37b0cc553c171478f3db4d0da9007407d4cf Mon Sep 17 00:00:00 2001 From: cristy Date: Sun, 16 Nov 2025 18:42:46 +0100 Subject: [PATCH 2/4] adds openapi automation dependency --- pyproject.toml | 1 + uv.lock | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7a310ec..a6935f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dev = [ "types-networkx>=3.3.0.20241020", "types-openpyxl>=3.1.5.20250306", "types-PyYAML>=6.0.12.20250326", + "openapi-httpx-client==0.4.3" ] [build-system] diff --git a/uv.lock b/uv.lock index c960596..fde5445 100644 --- a/uv.lock +++ b/uv.lock @@ -1377,6 +1377,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1d/2a/7dd3d207ec669cacc1f186fd856a0f61dbc255d24f6fdc1a6715d6051b0f/openai-1.109.1-py3-none-any.whl", hash = "sha256:6bcaf57086cf59159b8e27447e4e7dd019db5d29a438072fbd49c290c7e65315", size = 948627, upload-time = "2025-09-24T13:00:50.754Z" }, ] +[[package]] +name = "openapi-httpx-client" +version = "0.4.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "httpx" }, + { name = "pyyaml" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f5/6f/a6adbb463aa71a0a1c1ce4c0e49b94215f8b885982fa970343a5d094825f/openapi_httpx_client-0.4.3.tar.gz", hash = "sha256:062f68056b679a3cb04a0b86f304388e58980acb4d335903dc595d5286a75367", size = 8594, upload-time = "2025-10-30T03:04:10.738Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/d0/cf7c7963e384ba2aa83c50b842a9667d86934e22b25d2c7ceb9dd2ce88b6/openapi_httpx_client-0.4.3-py3-none-any.whl", hash = "sha256:24219a9d2052f03fdb6561bad5005c2dd31bfae706935798b30cd49e6e8607da", size = 9006, upload-time = "2025-10-30T03:04:09.63Z" }, +] + [[package]] name = "packaging" version = "25.0" @@ -1558,7 +1571,7 @@ mistralai = [ [[package]] name = "pipelex-cookbook" -version = "0.9.0" +version = "0.9.1" source = { editable = "." } dependencies = [ { name = "beautifulsoup4" }, @@ -1572,6 +1585,7 @@ compat = [ dev = [ { name = "boto3-stubs" }, { name = "mypy" }, + { name = "openapi-httpx-client" }, { name = "pyright" }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -1595,6 +1609,7 @@ requires-dist = [ { name = "beautifulsoup4", specifier = "==4.13.4" }, { name = "boto3-stubs", marker = "extra == 'dev'", specifier = ">=1.35.24" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.11.2" }, + { name = "openapi-httpx-client", marker = "extra == 'dev'", specifier = "==0.4.3" }, { name = "pipelex", extras = ["mistralai", "anthropic", "google", "google-genai", "bedrock", "fal"], specifier = "==0.15.4" }, { name = "pyright", marker = "extra == 'dev'", specifier = ">=1.1.405" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=0.9.1" }, From 18e44a1abcbd49107ac9d8fb0889abb5edc1d543 Mon Sep 17 00:00:00 2001 From: cristy Date: Tue, 18 Nov 2025 11:11:20 +0100 Subject: [PATCH 3/4] fixes lint errors and formatting errors --- examples/openapi_automation/models.py | 59 +++++++----------------- examples/openapi_automation/pipefuncs.py | 9 ++-- 2 files changed, 20 insertions(+), 48 deletions(-) diff --git a/examples/openapi_automation/models.py b/examples/openapi_automation/models.py index 11fc6a2..eebcc98 100644 --- a/examples/openapi_automation/models.py +++ b/examples/openapi_automation/models.py @@ -1,7 +1,9 @@ from typing import Any, Dict, List, Optional + from pipelex.core.stuffs.structured_content import StructuredContent from pydantic import BaseModel, Field + class FunctionParameter(StructuredContent): name: str = Field(description="parameter name") value: str = Field(description="parameter value") @@ -58,12 +60,8 @@ class OpenAPISpec(StructuredContent): openapi: str = Field(description="OpenAPI version") info: OpenAPIInfo = Field(description="API metadata") paths: Dict[str, OpenAPIPathItem] = Field(description="API endpoints") - components: Optional[Dict[str, Any]] = Field( - default=None, description="Reusable components" - ) - servers: Optional[List[Dict[str, Any]]] = Field( - default=None, description="API servers" - ) + components: Optional[Dict[str, Any]] = Field(default=None, description="Reusable components") + servers: Optional[List[Dict[str, Any]]] = Field(default=None, description="API servers") class FunctionInfo(StructuredContent): @@ -80,16 +78,10 @@ class ParameterDetail(StructuredContent): """Detailed parameter information for API calls""" name: str = Field(description="Parameter name") - param_in: str = Field( - description="Where the parameter goes: path, query, header, cookie" - ) - required: bool = Field( - default=False, description="Whether the parameter is required" - ) + param_in: str = Field(description="Where the parameter goes: path, query, header, cookie") + required: bool = Field(default=False, description="Whether the parameter is required") param_type: Optional[str] = Field(default=None, description="Parameter data type") - description: Optional[str] = Field( - default=None, description="Parameter description" - ) + description: Optional[str] = Field(default=None, description="Parameter description") default: Optional[Any] = Field(default=None, description="Default value if any") @@ -99,18 +91,10 @@ class FunctionDetails(StructuredContent): function_name: str = Field(description="The operation ID / function name") http_method: str = Field(description="HTTP method (GET, POST, PUT, DELETE, etc.)") path: str = Field(description="API endpoint path") - description: Optional[str] = Field( - default=None, description="Operation description" - ) - parameters: List[ParameterDetail] = Field( - default_factory=list, description="List of parameters" - ) - request_body_required: bool = Field( - default=False, description="Whether a request body is required" - ) - request_body_schema: Optional[Dict[str, Any]] = Field( - default=None, description="Request body schema if applicable" - ) + description: Optional[str] = Field(default=None, description="Operation description") + parameters: List[ParameterDetail] = Field(default_factory=list, description="List of parameters") + request_body_required: bool = Field(default=False, description="Whether a request body is required") + request_body_schema: Optional[Dict[str, Any]] = Field(default=None, description="Request body schema if applicable") tags: Optional[List[str]] = Field(default=None, description="Operation tags") @@ -120,19 +104,8 @@ class RequestDetails(StructuredContent): function_name: str = Field(description="The operation ID / function name") http_method: str = Field(description="HTTP method (GET, POST, PUT, DELETE, etc.)") path: str = Field(description="API endpoint path") - query_parameters: Optional[Dict[str, Any]] = Field( - default=None, description="Query parameters and their values" - ) - path_parameters: Optional[Dict[str, Any]] = Field( - default=None, description="Path parameters and their values" - ) - header_parameters: Optional[Dict[str, Any]] = Field( - default=None, description="Header parameters and their values" - ) - cookie_parameters: Optional[Dict[str, Any]] = Field( - default=None, description="Cookie parameters and their values" - ) - request_body: Optional[Dict[str, Any]] = Field( - default=None, description="Request body data if applicable" - ) - + query_parameters: Optional[Dict[str, Any]] = Field(default=None, description="Query parameters and their values") + path_parameters: Optional[Dict[str, Any]] = Field(default=None, description="Path parameters and their values") + header_parameters: Optional[Dict[str, Any]] = Field(default=None, description="Header parameters and their values") + cookie_parameters: Optional[Dict[str, Any]] = Field(default=None, description="Cookie parameters and their values") + request_body: Optional[Dict[str, Any]] = Field(default=None, description="Request body data if applicable") diff --git a/examples/openapi_automation/pipefuncs.py b/examples/openapi_automation/pipefuncs.py index c774b4c..821bcdf 100644 --- a/examples/openapi_automation/pipefuncs.py +++ b/examples/openapi_automation/pipefuncs.py @@ -1,4 +1,5 @@ import json +from typing import List from urllib.parse import urlparse import requests @@ -8,7 +9,7 @@ from pipelex.core.stuffs.text_content import TextContent from pipelex.system.registries.func_registry import pipe_func -from examples.openapi_automation.models import * +from examples.openapi_automation.models import FunctionChoice, FunctionDetails, FunctionInfo, OpenAPISpec, ParameterDetail, RequestDetails @pipe_func() @@ -100,7 +101,7 @@ async def invoke_function_api_backend(working_memory: WorkingMemory) -> TextCont except requests.exceptions.RequestException as e: error_msg = f"Request failed: {str(e)}" - if hasattr(e, 'response') and e.response is not None: + if hasattr(e, "response") and e.response is not None: error_msg += f"\nStatus code: {e.response.status_code}" error_msg += f"\nResponse: {e.response.text}" print(error_msg) @@ -227,9 +228,7 @@ async def extract_available_functions( # Get description from summary or description field description = operation.get("summary") or operation.get("description") - functions.append( - FunctionInfo(function_name=operation_id, description=description) - ) + functions.append(FunctionInfo(function_name=operation_id, description=description)) # Convert to ListContent with FunctionInfo items return ListContent(items=functions) From ef61e3afe904f0c410becba15fc57ea5790840c0 Mon Sep 17 00:00:00 2001 From: cristy Date: Sun, 23 Nov 2025 21:36:18 +0100 Subject: [PATCH 4/4] fixes the merge-check-pyright by adding the necessary types to the variables --- examples/openapi_automation/models.py | 2 +- examples/openapi_automation/pipefuncs.py | 99 ++++++++++++------------ 2 files changed, 51 insertions(+), 50 deletions(-) diff --git a/examples/openapi_automation/models.py b/examples/openapi_automation/models.py index eebcc98..8dbd748 100644 --- a/examples/openapi_automation/models.py +++ b/examples/openapi_automation/models.py @@ -92,7 +92,7 @@ class FunctionDetails(StructuredContent): http_method: str = Field(description="HTTP method (GET, POST, PUT, DELETE, etc.)") path: str = Field(description="API endpoint path") description: Optional[str] = Field(default=None, description="Operation description") - parameters: List[ParameterDetail] = Field(default_factory=list, description="List of parameters") + parameters: List[ParameterDetail] = Field(default_factory=list, description="List of parameters") # type: ignore[reportUnknownVariableType] request_body_required: bool = Field(default=False, description="Whether a request body is required") request_body_schema: Optional[Dict[str, Any]] = Field(default=None, description="Request body schema if applicable") tags: Optional[List[str]] = Field(default=None, description="Operation tags") diff --git a/examples/openapi_automation/pipefuncs.py b/examples/openapi_automation/pipefuncs.py index 821bcdf..3d522ae 100644 --- a/examples/openapi_automation/pipefuncs.py +++ b/examples/openapi_automation/pipefuncs.py @@ -1,5 +1,5 @@ import json -from typing import List +from typing import Any, List from urllib.parse import urlparse import requests @@ -19,15 +19,15 @@ async def invoke_function_api_backend(working_memory: WorkingMemory) -> TextCont Builds and performs the actual HTTP request using the requests library. """ # Get the base URL from the OpenAPI spec - openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() - request_details = working_memory.get_stuff_as("request_details", RequestDetails) + openapi_url: str = working_memory.get_stuff_as_text("openapi_url").text.strip() + request_details: RequestDetails = working_memory.get_stuff_as("request_details", RequestDetails) # Get the base URL from the OpenAPI spec - response = requests.get(url=openapi_url) - spec_data = response.json() + response: requests.Response = requests.get(url=openapi_url) + spec_data: dict[str, Any] = response.json() # Extract base URL from servers - base_url = None + base_url: str | None = None if "servers" in spec_data and len(spec_data["servers"]) > 0: base_url = spec_data["servers"][0].get("url", "") @@ -48,28 +48,28 @@ async def invoke_function_api_backend(working_memory: WorkingMemory) -> TextCont print(f" Host: {parsed_url.hostname}") print(f" Port: {parsed_url.port if parsed_url.port else 'default'}") # Build the full URL with path parameters - url_path = request_details.path + url_path: str = request_details.path if request_details.path_parameters: for param_name, param_value in request_details.path_parameters.items(): url_path = url_path.replace(f"{{{param_name}}}", str(param_value)) - full_url = f"{base_url.rstrip('/')}/{url_path.lstrip('/')}" + full_url: str = f"{base_url.rstrip('/')}/{url_path.lstrip('/')}" # Prepare request components - headers = {} + headers: dict[str, Any] = {} if request_details.header_parameters: headers.update(request_details.header_parameters) - cookies = {} + cookies: dict[str, Any] = {} if request_details.cookie_parameters: cookies.update(request_details.cookie_parameters) - params = {} + params: dict[str, Any] = {} if request_details.query_parameters: params.update(request_details.query_parameters) # Prepare request body - json_body = None + json_body: dict[str, Any] | None = None if request_details.request_body: json_body = request_details.request_body @@ -80,7 +80,7 @@ async def invoke_function_api_backend(working_memory: WorkingMemory) -> TextCont print(f"Body: {json_body}") try: - http_response = requests.request( + http_response: requests.Response = requests.request( method=request_details.http_method, url=full_url, params=params if params else None, @@ -94,13 +94,13 @@ async def invoke_function_api_backend(working_memory: WorkingMemory) -> TextCont # Try to parse JSON response, fallback to text try: - result = http_response.json() + result: Any = http_response.json() return TextContent(text=json.dumps(result, indent=2)) except json.JSONDecodeError: return TextContent(text=http_response.text) except requests.exceptions.RequestException as e: - error_msg = f"Request failed: {str(e)}" + error_msg: str = f"Request failed: {str(e)}" if hasattr(e, "response") and e.response is not None: error_msg += f"\nStatus code: {e.response.status_code}" error_msg += f"\nResponse: {e.response.text}" @@ -110,20 +110,20 @@ async def invoke_function_api_backend(working_memory: WorkingMemory) -> TextCont @pipe_func() async def obtain_openapi_spec(working_memory: WorkingMemory) -> TextContent: - openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() - response = requests.get(url=openapi_url) - spec_data = response.json() + openapi_url: str = working_memory.get_stuff_as_text("openapi_url").text.strip() + response: requests.Response = requests.get(url=openapi_url) + spec_data: dict[str, Any] = response.json() - api = OpenAPIClient(definition=openapi_url) + api: OpenAPIClient = OpenAPIClient(definition=openapi_url) # Use the async client with context manager - async with api.AsyncClient() as client: + async with api.AsyncClient() as client: # type: ignore[reportUnknownMemberType] # Build detailed function signatures from OpenAPI spec - functions_detail = [] + functions_detail: list[str] = [] # Parse the OpenAPI spec to extract function signatures if "paths" in spec_data: - for path, methods in spec_data["paths"].items(): + for _path, methods in spec_data["paths"].items(): for method, operation in methods.items(): if method.lower() not in [ "get", @@ -136,17 +136,17 @@ async def obtain_openapi_spec(working_memory: WorkingMemory) -> TextContent: ]: continue - operation_id = operation.get("operationId") + operation_id: str | None = operation.get("operationId") if not operation_id: continue - params = [] + params: list[str] = [] # Extract parameters if "parameters" in operation: for param in operation["parameters"]: - param_name = param.get("name", "unknown") - param_required = param.get("required", False) + param_name: str = param.get("name", "unknown") + param_required: bool = param.get("required", False) if param_required: params.append(f"{param_name}") @@ -157,16 +157,17 @@ async def obtain_openapi_spec(working_memory: WorkingMemory) -> TextContent: if "requestBody" in operation: params.append("body=" + json.dumps(operation["requestBody"])) - params_str = ", ".join(params) if params else "" + params_str: str = ", ".join(params) if params else "" functions_detail.append(f"{operation_id}({params_str})") # Fallback: just list function names if not functions_detail: - for func_name in client.functions.keys(): + func_name: str + for func_name in client.functions.keys(): # type: ignore[reportUnknownMemberType, reportUnknownVariableType] functions_detail.append(f"{func_name}(**kwargs)") - functions_text = "\n".join(functions_detail) - spec = f"\n\nAvailable functions:\n{functions_text}" + functions_text: str = "\n".join(functions_detail) + spec: str = f"\n\nAvailable functions:\n{functions_text}" return TextContent(text=spec) @@ -179,13 +180,13 @@ async def obtain_openapi_model(working_memory: WorkingMemory) -> OpenAPISpec: Returns: OpenAPISpec: Structured representation of the OpenAPI specification """ - openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() - response = requests.get(url=openapi_url) - spec_data = response.json() + openapi_url: str = working_memory.get_stuff_as_text("openapi_url").text.strip() + response: requests.Response = requests.get(url=openapi_url) + spec_data: dict[str, Any] = response.json() # Parse the raw JSON into our Pydantic model # The model will validate and structure the data - openapi_spec = OpenAPISpec(**spec_data) + openapi_spec: OpenAPISpec = OpenAPISpec(**spec_data) return openapi_spec @@ -200,15 +201,15 @@ async def extract_available_functions( Returns: ListContent: List of FunctionInfo objects with function_name and description """ - openapi_url = working_memory.get_stuff_as_text("openapi_url").text.strip() - response = requests.get(url=openapi_url) - spec_data = response.json() + openapi_url: str = working_memory.get_stuff_as_text("openapi_url").text.strip() + response: requests.Response = requests.get(url=openapi_url) + spec_data: dict[str, Any] = response.json() functions: List[FunctionInfo] = [] # Parse the OpenAPI spec to extract function names and descriptions if "paths" in spec_data: - for path, methods in spec_data["paths"].items(): + for _path, methods in spec_data["paths"].items(): for method, operation in methods.items(): if method.lower() not in [ "get", @@ -221,12 +222,12 @@ async def extract_available_functions( ]: continue - operation_id = operation.get("operationId") + operation_id: str | None = operation.get("operationId") if not operation_id: continue # Get description from summary or description field - description = operation.get("summary") or operation.get("description") + description: str | None = operation.get("summary") or operation.get("description") functions.append(FunctionInfo(function_name=operation_id, description=description)) @@ -244,9 +245,9 @@ async def get_function_details(working_memory: WorkingMemory) -> FunctionDetails FunctionDetails: Complete details needed to make the API request """ # Get the structured OpenAPISpec from working memory - openapi_spec = working_memory.get_stuff_as("openapi_spec", OpenAPISpec) - function_choice = working_memory.get_stuff_as("function_choice", FunctionChoice) - function_name = function_choice.function_name + openapi_spec: OpenAPISpec = working_memory.get_stuff_as("openapi_spec", OpenAPISpec) + function_choice: FunctionChoice = working_memory.get_stuff_as("function_choice", FunctionChoice) + function_name: str = function_choice.function_name # Search for the function in the OpenAPI spec for path, path_item in openapi_spec.paths.items(): @@ -261,8 +262,8 @@ async def get_function_details(working_memory: WorkingMemory) -> FunctionDetails # Extract parameters from the structured model if operation.parameters: for param in operation.parameters: - param_type = None - param_default = None + param_type: str | None = None + param_default: Any = None if param.schema_: param_type = param.schema_.get("type") param_default = param.schema_.get("default") @@ -279,17 +280,17 @@ async def get_function_details(working_memory: WorkingMemory) -> FunctionDetails ) # Extract request body information - request_body_required = False - request_body_schema = None + request_body_required: bool = False + request_body_schema: Any = None if operation.requestBody: request_body_required = operation.requestBody.required or False request_body_schema = operation.requestBody.content # Get description (prefer summary over description) - description = operation.summary or operation.description + description: str | None = operation.summary or operation.description # Get tags - tags = operation.tags + tags: list[str] | None = operation.tags return FunctionDetails( function_name=function_name,