diff --git a/examples/google_adk_example.py b/examples/google_adk_example.py new file mode 100644 index 0000000..ed9ff39 --- /dev/null +++ b/examples/google_adk_example.py @@ -0,0 +1,139 @@ +"""Example usage of GoogleADKRunner with the Google Agent Development Kit. + +This example demonstrates how to: +1. Create a Google ADK agent with tools +2. Use GoogleADKRunner for execution +3. Track activity with the built-in report_status tool +4. Control execution limits using RunConfig + +Requirements: + pip install google-adk agentexec + +Note: You'll need to set up Google Cloud credentials for this to work. +Set the GOOGLE_API_KEY environment variable or configure ADC. +""" + +import asyncio +import uuid + +from google.adk.agents import LlmAgent +from google.adk.core.run_config import RunConfig + +import agentexec as ax + + +# Example tool for the agent +def search_company_info(company_name: str) -> str: + """Search for basic information about a company. + + Args: + company_name: Name of the company to search for. + + Returns: + Basic company information. + """ + # This is a mock implementation + return f"Company: {company_name}\nIndustry: Technology\nFounded: 2020\nEmployees: 500" + + +async def main(): + """Run a simple Google ADK agent with activity tracking.""" + + # Generate a unique agent ID for tracking + agent_id = uuid.uuid4() + + # Create the runner with activity tracking + runner = ax.GoogleADKRunner( + agent_id=agent_id, + app_name="company_research", + report_status_prompt="Use report_activity(message, percentage) to report your progress.", + ) + + # Create a Google ADK agent + # Note: Include the runner's prompts and tools for activity tracking + research_agent = LlmAgent( + name="Company Research Agent", + model="gemini-2.0-flash", + instruction=f"""You are a thorough company research analyst. + +When researching a company: +1. Use the search_company_info tool to gather information +2. Analyze the data you find +3. Provide a concise summary + +{runner.prompts.report_status}""", + tools=[ + search_company_info, + runner.tools.report_status, # Add the activity tracking tool + ], + ) + + # Run the agent with execution control + # RunConfig controls execution limits (default max_llm_calls=500) + print(f"Starting research with agent_id: {agent_id}") + print("-" * 60) + + run_config = RunConfig( + max_llm_calls=100, # Limit to 100 LLM calls to prevent runaway execution + ) + + result = await runner.run( + agent=research_agent, + input="Research Acme Corporation and provide a brief overview.", + run_config=run_config, + ) + + # Extract and display the result + print("\n" + "=" * 60) + print("FINAL RESULT:") + print("=" * 60) + if result.final_output: + print(result.final_output) + else: + print("No final output received") + + print(f"\nTotal events: {len(result.events)}") + + +async def streaming_example(): + """Example of using the streaming mode with event forwarding.""" + + agent_id = uuid.uuid4() + + runner = ax.GoogleADKRunner( + agent_id=agent_id, + app_name="streaming_demo", + ) + + agent = LlmAgent( + name="Streaming Agent", + model="gemini-2.0-flash", + instruction="You are a helpful assistant. Be concise.", + tools=[runner.tools.report_status], + ) + + # Define an event forwarder to process events in real-time + async def handle_event(event): + """Process each event as it arrives.""" + print(f"[EVENT] {type(event).__name__}") + if hasattr(event, "is_final_response") and event.is_final_response(): + print(f"[FINAL] {event.content.parts[0].text if event.content.parts else 'No text'}") + + print(f"Starting streaming agent with agent_id: {agent_id}") + print("-" * 60) + + result = await runner.run_streamed( + agent=agent, + input="What is 2 + 2?", + forwarder=handle_event, + ) + + print(f"\nStreaming complete. Total events: {len(result.events)}") + + +if __name__ == "__main__": + # Run the basic example + asyncio.run(main()) + + # Uncomment to run the streaming example + # asyncio.run(streaming_example()) diff --git a/src/agentexec/__init__.py b/src/agentexec/__init__.py index bb69f87..c12b3cd 100644 --- a/src/agentexec/__init__.py +++ b/src/agentexec/__init__.py @@ -70,3 +70,11 @@ async def research_company(agent_id: UUID, context: ResearchContext): __all__.append("OpenAIRunner") except ImportError: pass + +# Google ADK runner is only available if google-adk package is installed +try: + from agentexec.runners import GoogleADKRunner + + __all__.append("GoogleADKRunner") +except ImportError: + pass diff --git a/src/agentexec/runners/__init__.py b/src/agentexec/runners/__init__.py index d1af0fd..dc511db 100644 --- a/src/agentexec/runners/__init__.py +++ b/src/agentexec/runners/__init__.py @@ -11,3 +11,11 @@ __all__.append("OpenAIRunner") except ImportError: pass + +# Google ADK runner is only available if google-adk package is installed +try: + from agentexec.runners.google_adk import GoogleADKRunner + + __all__.append("GoogleADKRunner") +except ImportError: + pass diff --git a/src/agentexec/runners/google_adk.py b/src/agentexec/runners/google_adk.py new file mode 100644 index 0000000..edf9b12 --- /dev/null +++ b/src/agentexec/runners/google_adk.py @@ -0,0 +1,317 @@ +import logging +import uuid +from typing import Any, Callable + +from google.adk.agents import LlmAgent +from google.adk.core.run_config import RunConfig +from google.adk.runners import Runner +from google.adk.sessions import InMemorySessionService +from google.genai import types + +from agentexec.runners.base import BaseAgentRunner, _RunnerTools + + +logger = logging.getLogger(__name__) + + +class _GoogleADKRunnerTools(_RunnerTools): + """Google ADK-specific tools wrapper. + + Note: Google ADK tools are typically defined as regular Python functions. + The agent framework handles tool registration automatically. + """ + + @property + def report_status(self) -> Any: + """Get the status update tool for Google ADK. + + Returns the plain function as Google ADK handles tool registration. + """ + return super().report_status + + +class GoogleADKRunResult: + """Result wrapper for Google ADK agent execution.""" + + def __init__(self, final_content: types.Content | None, events: list[Any]): + """Initialize the result. + + Args: + final_content: The final response content from the agent. + events: List of all events generated during execution. + """ + self.final_content = final_content + self.events = events + + @property + def final_output(self) -> str | None: + """Extract the final text output from the result.""" + if self.final_content and self.final_content.parts: + # Extract text from the first part + for part in self.final_content.parts: + if hasattr(part, "text") and part.text: + return part.text + return None + + +class GoogleADKRunner(BaseAgentRunner): + """Runner for Google Agent Development Kit (ADK) with automatic activity tracking. + + This runner wraps the Google ADK and provides: + - Automatic agent_id generation + - Activity lifecycle management (QUEUED -> RUNNING -> COMPLETE/ERROR) + - Session management with InMemorySessionService + - Status update tool with agent_id pre-baked + - Streaming event support + - Execution control via RunConfig (max_llm_calls, etc.) + + Example: + from google.adk.core.run_config import RunConfig + + runner = agentexec.GoogleADKRunner( + agent_id=agent_id, + app_name="my_app", + report_status_prompt="Use report_activity(message, percentage) to report progress.", + ) + + agent = LlmAgent( + name="Research Agent", + model="gemini-2.0-flash", + instruction=f"Research companies. {runner.prompts.report_status}", + tools=[runner.tools.report_status], + ) + + # Use RunConfig to control execution limits (default max_llm_calls=500) + run_config = RunConfig(max_llm_calls=100) + + result = await runner.run( + agent=agent, + input="Research Acme Corp", + run_config=run_config, + ) + """ + + app_name: str + session_service: InMemorySessionService + _runner: Runner | None + + def __init__( + self, + agent_id: uuid.UUID, + *, + app_name: str = "agentexec", + report_status_prompt: str | None = None, + ) -> None: + """Initialize the Google ADK runner. + + Args: + agent_id: UUID for tracking this agent's activity. + app_name: Application name for session management. + report_status_prompt: Instruction snippet about using the status tool. + """ + # Google ADK handles execution control via RunConfig, so we disable + # the base class's max_turns_recovery feature + super().__init__( + agent_id, + max_turns_recovery=False, + recovery_turns=0, + wrap_up_prompt=None, + report_status_prompt=report_status_prompt, + ) + # Override with Google ADK-specific tools + self.tools = _GoogleADKRunnerTools(self.agent_id) + + # Initialize session service + self.app_name = app_name + self.session_service = InMemorySessionService() + self._runner = None + + def _get_or_create_runner(self, agent: LlmAgent) -> Runner: + """Get or create a Runner instance for the agent. + + Args: + agent: The LlmAgent to create a runner for. + + Returns: + Configured Runner instance. + """ + # Create a new runner for this agent + return Runner( + agent=agent, + app_name=self.app_name, + session_service=self.session_service, + ) + + async def _ensure_session(self, user_id: str, session_id: str) -> None: + """Ensure a session exists in the session service. + + Args: + user_id: User identifier for the session. + session_id: Session identifier. + """ + # Create session if it doesn't exist + try: + await self.session_service.create_session( + app_name=self.app_name, + user_id=user_id, + session_id=session_id, + ) + except Exception as e: + # Session might already exist, which is fine + logger.debug(f"Session creation note: {e}") + + async def run( + self, + agent: LlmAgent, + input: str | types.Content, + run_config: RunConfig | None = None, + user_id: str | None = None, + session_id: str | None = None, + ) -> GoogleADKRunResult: + """Run the agent with automatic activity tracking. + + Args: + agent: LlmAgent instance. + input: User input/prompt for the agent (string or Content object). + run_config: Optional RunConfig for execution control (max_llm_calls, etc.). + Defaults to RunConfig() with max_llm_calls=500. + user_id: User ID for session management (defaults to agent_id). + session_id: Session ID (defaults to agent_id). + + Returns: + GoogleADKRunResult with final output and events. + """ + # Use agent_id as default for user_id and session_id + user_id = user_id or str(self.agent_id) + session_id = session_id or str(self.agent_id) + + # Ensure session exists + await self._ensure_session(user_id, session_id) + + # Create runner + runner = self._get_or_create_runner(agent) + + # Convert input to Content if it's a string + if isinstance(input, str): + content = types.Content( + role="user", + parts=[types.Part(text=input)] + ) + else: + content = input + + # Use default RunConfig if not provided + if run_config is None: + run_config = RunConfig() + + # Run the agent and collect events + events = [] + final_content = None + + try: + async for event in runner.run_async( + user_id=user_id, + session_id=session_id, + new_message=content, + run_config=run_config, + ): + events.append(event) + if hasattr(event, "is_final_response") and event.is_final_response(): + final_content = event.content + + except Exception as e: + logger.error(f"Error during Google ADK agent execution: {e}") + raise + + return GoogleADKRunResult(final_content=final_content, events=events) + + async def run_streamed( + self, + agent: LlmAgent, + input: str | types.Content, + run_config: RunConfig | None = None, + forwarder: Callable | None = None, + user_id: str | None = None, + session_id: str | None = None, + ) -> GoogleADKRunResult: + """Run the agent in streaming mode with automatic activity tracking. + + The forwarder callback receives each event as it's generated, allowing + real-time processing of agent execution. This method is functionally + equivalent to run() but emphasizes the streaming nature of ADK's + run_async() method. + + Args: + agent: LlmAgent instance. + input: User input/prompt for the agent (string or Content object). + run_config: Optional RunConfig for execution control (max_llm_calls, etc.). + Defaults to RunConfig() with max_llm_calls=500. + forwarder: Optional async callback to process each event as it arrives. + user_id: User ID for session management (defaults to agent_id). + session_id: Session ID (defaults to agent_id). + + Returns: + GoogleADKRunResult with final output and events. + + Example: + from google.adk.core.run_config import RunConfig + + async def handle_event(event): + print(f"Event: {event}") + + result = await runner.run_streamed( + agent=agent, + input="Research XYZ", + run_config=RunConfig(max_llm_calls=100), + forwarder=handle_event + ) + """ + # Use agent_id as default for user_id and session_id + user_id = user_id or str(self.agent_id) + session_id = session_id or str(self.agent_id) + + # Ensure session exists + await self._ensure_session(user_id, session_id) + + # Create runner + runner = self._get_or_create_runner(agent) + + # Convert input to Content if it's a string + if isinstance(input, str): + content = types.Content( + role="user", + parts=[types.Part(text=input)] + ) + else: + content = input + + # Use default RunConfig if not provided + if run_config is None: + run_config = RunConfig() + + # Run the agent in streaming mode + events = [] + final_content = None + + try: + async for event in runner.run_async( + user_id=user_id, + session_id=session_id, + new_message=content, + run_config=run_config, + ): + events.append(event) + + # Forward event if forwarder is provided + if forwarder: + await forwarder(event) + + # Check if this is the final response + if hasattr(event, "is_final_response") and event.is_final_response(): + final_content = event.content + + except Exception as e: + logger.error(f"Error during Google ADK agent streaming execution: {e}") + raise + + return GoogleADKRunResult(final_content=final_content, events=events)