diff --git a/.github/workflows/lint-python.yml b/.github/workflows/lint-python.yml new file mode 100644 index 0000000..b0c5c3d --- /dev/null +++ b/.github/workflows/lint-python.yml @@ -0,0 +1,34 @@ +name: Lint Python Code + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + + - name: Install project dependencies + run: | + pip install -e . + + - name: Lint with pylint + run: | + pylint lemonade_arcade --rcfile .pylintrc + diff --git a/.github/workflows/test_lemonade_client.yml b/.github/workflows/test_lemonade_client.yml new file mode 100644 index 0000000..1c9e18d --- /dev/null +++ b/.github/workflows/test_lemonade_client.yml @@ -0,0 +1,59 @@ +name: Run LemonadeClient Tests + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + include: + # Linux with source installation + - os: ubuntu-latest + install-type: 'source' + test-name: 'Linux Source' + + # Windows with source installation + - os: windows-latest + install-type: 'source' + test-name: 'Windows Source' + + name: ${{ matrix.test-name }} + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python 3.10 + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install base dependencies + run: | + python -m pip install --upgrade pip + + - name: Install source dependencies + run: | + pip install -e . + + - name: Run unit tests + run: | + python test/lemonade_client_unit.py + + - name: Run integration tests + run: | + python test/lemonade_client_integration.py + env: + PYTHONUNBUFFERED: 1 + + - name: Test integration example + run: | + python examples/lemonade_client_integration_example.py + env: + PYTHONUNBUFFERED: 1 \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..b944a49 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,18 @@ +[MASTER] + +# Ignore the builtin_games folder +ignore=builtin_games + +[MESSAGES CONTROL] + +# Disable the following warnings: +# W1203: logging-fstring-interpolation - Using f-strings in logging calls +# W0718: broad-exception-caught - Catching too general exception +# C0415: import-outside-toplevel - Import outside toplevel +# R0913: too-many-arguments - Too many arguments +# R1705: no-else-return - Unnecessary "else" after "return", remove the "else" and de-indent the code inside it +# R0917: too-many-positional-arguments - Too many positional arguments +# R0912: too-many-branches - Too many branches +# C0114: missing-module-docstring - Missing module docstring +# R0915: too-many-statements - Too many statements +disable=W1203,W0718,C0415,R0913,R1705,R0917,R0912,C0114,R0915 diff --git a/docs/lemonade_client_api.md b/docs/lemonade_client_api.md new file mode 100644 index 0000000..833e729 --- /dev/null +++ b/docs/lemonade_client_api.md @@ -0,0 +1,475 @@ +# LemonadeClient API Reference + +The `LemonadeClient` class provides a comprehensive interface for integrating with lemonade-server in your applications. It handles installation, configuration, server management, and model operations with cross-platform compatibility. + +## Suggested Workflow + +Here's the typical sequence of API calls for setting up a lemonade-server-based application: + +### 1. Initial Setup and Environment Check +```python +from lemonade_arcade.lemonade_client import LemonadeClient + +client = LemonadeClient() + +# Check deployment environment +is_pyinstaller = client.is_pyinstaller_environment() + +# Check if SDK is available (for development environments) +sdk_available = await client.check_lemonade_sdk_available() +``` + +### 2. Installation Status Check +```python +# Check if lemonade-server is installed and compatible +version_info = await client.check_lemonade_server_version() +if not version_info["installed"] or not version_info["compatible"]: + # Need to install or upgrade + result = await client.download_and_install_lemonade_server() + if result["success"]: + # Refresh environment after installation + client.refresh_environment() + client.reset_server_state() +``` + +### 3. Server Management +```python +# Check if server is running +is_running = await client.check_lemonade_server_running() +if not is_running: + # Start the server + start_result = await client.start_lemonade_server() + +# Verify API connectivity +api_online = await client.check_lemonade_server_api() +``` + +### 4. Model Management +```python +required_model = "Qwen3-0.6B-GGUF" + +# Check if model is installed +model_status = await client.check_model_installed(required_model) +if not model_status["installed"]: + # Install the model + install_result = await client.install_model(required_model) + +# Check if model is loaded +load_status = await client.check_model_loaded(required_model) +if not load_status["loaded"]: + # Load the model + load_result = await client.load_model(required_model) +``` + +### 5. Ready for Inference +Once the above steps complete successfully, your application can make inference requests to `client.url` (default: `http://localhost:8000`) using the OpenAI-compatible API. + +--- + +## API Reference + +### Environment and System Detection + +#### `is_pyinstaller_environment()` +Check if the application is running in a PyInstaller bundle environment. + +**When to use:** Determine installation method preferences or adjust behavior based on deployment type. PyInstaller environments typically prefer installer-based server installation over pip. + +**Returns:** `bool` - True if running in PyInstaller bundle, False otherwise + +**Example:** +```python +if client.is_pyinstaller_environment(): + print("Running as packaged executable") + # Prefer installer-based installation +else: + print("Running in development environment") + # Can use pip installation +``` + +--- + +#### `find_lemonade_server_paths()` +Find lemonade-server installation paths by scanning the system PATH. + +**When to use:** Discover where lemonade-server binaries are installed on the system. Helpful for apps that need to verify installation locations or debug path issues. + +**Returns:** `List[str]` - List of directory paths containing lemonade-server installations + +**Example:** +```python +paths = client.find_lemonade_server_paths() +print(f"Found lemonade-server in: {paths}") +``` + +--- + +#### `refresh_environment()` +Refresh environment variables from the system registry (Windows only). + +**When to use:** After installing lemonade-server to pick up newly added PATH entries without requiring an application restart. Essential for apps that install lemonade-server programmatically and need immediate access to the commands. + +**Example:** +```python +# After installation +await client.download_and_install_lemonade_server() +client.refresh_environment() # Pick up new PATH entries +client.reset_server_state() # Clear cached commands +``` + +--- + +#### `reset_server_state()` +Reset cached server state after installation changes or configuration updates. + +**When to use:** Call this when you've installed/updated lemonade-server or changed system configuration to ensure the client rediscovers server commands and processes. Essential after installation operations to avoid using stale cached paths. + +**Example:** +```python +# After any installation or configuration change +client.reset_server_state() +``` + +--- + +### Core Server Operations + +#### `execute_lemonade_server_command(args, timeout=10, use_popen=False, stdout_file=None, stderr_file=None)` +Execute lemonade-server commands using the best available method for the system. + +**When to use:** As the primary interface for running any lemonade-server command. The method automatically tries different installation methods (pip, installer, dev) and caches the successful command for future use. Essential for cross-platform compatibility. + +**Parameters:** +- `args: List[str]` - Command arguments to pass to lemonade-server (e.g., `["--version"]`, `["serve"]`) +- `timeout: int` - Maximum seconds to wait for command completion (ignored for background processes) +- `use_popen: bool` - True for background processes that shouldn't block, False for commands with output +- `stdout_file` - File handle to redirect standard output (only with use_popen=True) +- `stderr_file` - File handle to redirect error output (only with use_popen=True) + +**Returns:** `subprocess.CompletedProcess` for regular commands, `subprocess.Popen` for background processes, or `None` if all command attempts failed + +**Example:** +```python +# Check version +result = await client.execute_lemonade_server_command(["--version"]) +if result: + print(f"Version: {result.stdout}") + +# Start server in background +process = await client.execute_lemonade_server_command( + ["serve"], + use_popen=True +) +``` + +--- + +### Installation and Setup + +#### `check_lemonade_sdk_available()` +Check if the lemonade-sdk Python package is installed and importable. + +**When to use:** Determine if pip-based installation is available before attempting SDK-based operations. Helpful for showing installation options to users or choosing between different installation methods. + +**Returns:** `bool` - True if lemonade-sdk package can be imported, False otherwise + +**Example:** +```python +if await client.check_lemonade_sdk_available(): + print("Can use lemonade-server-dev command") +else: + print("Need to install via pip or use installer") +``` + +--- + +#### `check_lemonade_server_version()` +Check lemonade-server installation status and version compatibility. + +**When to use:** Verify that lemonade-server is installed and meets minimum version requirements before attempting to use server features. Essential for displaying installation status and guiding users through setup. + +**Returns:** `dict` with keys: +- `installed: bool` - Whether lemonade-server is found +- `version: str` - Version string or None +- `compatible: bool` - Whether version meets minimum requirements +- `required_version: str` - Minimum required version + +**Example:** +```python +version_info = await client.check_lemonade_server_version() +if version_info["installed"] and version_info["compatible"]: + print(f"lemonade-server {version_info['version']} is ready") +else: + print(f"Need version {version_info['required_version']} or higher") +``` + +--- + +#### `install_lemonade_sdk_package()` +Install the lemonade-sdk Python package using pip. + +**When to use:** Install lemonade-server via pip when in development environments or when the SDK approach is preferred. Provides access to lemonade-server-dev command after successful installation. + +**Returns:** `dict` with keys: +- `success: bool` - Whether installation succeeded +- `message: str` - Success message or error details + +**Example:** +```python +result = await client.install_lemonade_sdk_package() +if result["success"]: + print("SDK installed successfully") + client.refresh_environment() +else: + print(f"Installation failed: {result['message']}") +``` + +--- + +#### `download_and_install_lemonade_server()` +Download and install lemonade-server using the best method for the environment. + +**When to use:** As the primary installation method. Automatically chooses between pip installation (development environments) or executable installer (PyInstaller bundles). Handles the complete installation process including download and setup. + +**Returns:** `dict` with keys: +- `success: bool` - Whether installation succeeded +- `message: str` - Status message or error details +- `interactive: bool` (optional) - Whether installer requires user interaction +- `github_link: str` (optional) - Link for manual installation if automated fails + +**Example:** +```python +result = await client.download_and_install_lemonade_server() +if result["success"]: + print("Installation completed") + if result.get("interactive"): + print("Please complete the installer UI") + client.refresh_environment() + client.reset_server_state() +else: + print(f"Installation failed: {result['message']}") + if "github_link" in result: + print(f"Manual installation: {result['github_link']}") +``` + +--- + +### Server Status and Management + +#### `check_lemonade_server_running()` +Check if the lemonade-server process is currently running. + +**When to use:** Determine server status before attempting operations that require a running server. Helps decide whether to start the server or proceed with API calls. + +**Returns:** `bool` - True if server process is running, False otherwise + +**Example:** +```python +if await client.check_lemonade_server_running(): + print("Server is running") +else: + print("Need to start server") + await client.start_lemonade_server() +``` + +--- + +#### `start_lemonade_server()` +Start the lemonade-server process in the background. + +**When to use:** Launch the server when it's not running and your app needs server functionality. The server runs in a separate process and the method tracks the process to avoid multiple instances. + +**Returns:** `dict` with keys: +- `success: bool` - Whether server started successfully +- `message: str` - Status message or error details + +**Example:** +```python +result = await client.start_lemonade_server() +if result["success"]: + print("Server started successfully") + # Wait a moment for startup + await asyncio.sleep(2) +else: + print(f"Failed to start server: {result['message']}") +``` + +--- + +#### `check_lemonade_server_api()` +Check if the lemonade-server API is responding to requests. + +**When to use:** Verify that the server is not only running but also accepting API connections. More reliable than process checks for determining if the server is ready to handle requests. + +**Returns:** `bool` - True if server API is responding, False otherwise + +**Example:** +```python +if await client.check_lemonade_server_api(): + print("API is ready for requests") + # Can now make inference calls +else: + print("API not responding, check server status") +``` + +--- + +### Model Management + +#### `get_available_models()` +Retrieve the list of models available on the lemonade-server. + +**When to use:** Discover which models are installed and available for use. Helpful for displaying model options to users or verifying that required models are available before attempting to use them. + +**Returns:** `List[str]` - List of model names/IDs available on the server, empty list if none found + +**Example:** +```python +models = await client.get_available_models() +print(f"Available models: {models}") +for model in models: + print(f" - {model}") +``` + +--- + +#### `check_model_installed(model)` +Check if a specific model is installed on the server. + +**When to use:** Verify model availability before attempting to load or use a model. Essential for apps that depend on specific models to function properly. + +**Parameters:** +- `model: str` - The model name/ID to check for (e.g., "Qwen3-0.6B-GGUF") + +**Returns:** `dict` with keys: +- `installed: bool` - Whether the model is available +- `model_name: str` - The requested model name + +**Example:** +```python +required_model = "Qwen3-0.6B-GGUF" +status = await client.check_model_installed(required_model) +if status["installed"]: + print(f"Model {required_model} is available") +else: + print(f"Need to install {required_model}") + await client.install_model(required_model) +``` + +--- + +#### `check_model_loaded(model)` +Check if a specific model is currently loaded and ready for inference. + +**When to use:** Verify that a model is loaded before making inference requests. Models must be loaded before they can be used for chat completions or other inference operations. + +**Parameters:** +- `model: str` - The model name/ID to check (e.g., "Qwen3-0.6B-GGUF") + +**Returns:** `dict` with keys: +- `loaded: bool` - Whether the model is currently loaded +- `model_name: str` - The requested model name +- `current_model: str` - Name of currently loaded model (may be different) + +**Example:** +```python +required_model = "Qwen3-0.6B-GGUF" +status = await client.check_model_loaded(required_model) +if status["loaded"]: + print(f"Model {required_model} is ready for inference") +else: + current = status["current_model"] + print(f"Current model: {current}, need to load {required_model}") + await client.load_model(required_model) +``` + +--- + +#### `install_model(model)` +Download and install a model on the lemonade-server. + +**When to use:** Install models that your app requires but aren't currently available on the server. The installation process may take several minutes for large models and requires an active internet connection. + +**Parameters:** +- `model: str` - The model name/ID to install (e.g., "Qwen3-0.6B-GGUF") + +**Returns:** `dict` with keys: +- `success: bool` - Whether installation succeeded +- `message: str` - Success message or error details + +**Example:** +```python +model_name = "Qwen3-0.6B-GGUF" +print(f"Installing {model_name}...") +result = await client.install_model(model_name) +if result["success"]: + print("Model installed successfully") +else: + print(f"Installation failed: {result['message']}") +``` + +--- + +#### `load_model(model)` +Load a model into memory for inference operations. + +**When to use:** Prepare an installed model for use. Models must be loaded before they can handle chat completions or other inference requests. Only one model can be loaded at a time. + +**Parameters:** +- `model: str` - The model name/ID to load (e.g., "Qwen3-0.6B-GGUF") + +**Returns:** `dict` with keys: +- `success: bool` - Whether model loaded successfully +- `message: str` - Success message or error details + +**Example:** +```python +model_name = "Qwen3-0.6B-GGUF" +print(f"Loading {model_name}...") +result = await client.load_model(model_name) +if result["success"]: + print("Model loaded and ready for inference") + # Can now make API calls to client.url +else: + print(f"Loading failed: {result['message']}") +``` + +--- + +## Error Handling + +Most methods return dictionaries with `success` and `message` keys for operations, or boolean values for status checks. Always check return values: + +```python +# For operations +result = await client.start_lemonade_server() +if not result["success"]: + print(f"Operation failed: {result['message']}") + return + +# For status checks +if not await client.check_lemonade_server_api(): + print("Server API is not responding") + return +``` + +## Integration Example + +For a complete, runnable example of integrating LemonadeClient into an application, see: + +**[examples/lemonade_client_integration_example.py](../examples/lemonade_client_integration_example.py)** + +This example demonstrates: +- Complete setup workflow from installation to ready-for-inference +- Error handling and status checking +- Model installation and loading +- Basic inference testing +- Proper logging and user feedback + +You can run the example directly to test your LemonadeClient setup: + +```bash +python lemonade-arcade/examples/lemonade_client_integration_example.py +``` + +The example includes comprehensive error handling and step-by-step progress reporting, making it suitable for both learning and as a starting point for your own integration. diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..bfbb912 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,41 @@ +# LemonadeClient Examples + +This directory contains practical examples demonstrating how to use the LemonadeClient class in your applications. + +## Available Examples + +### [lemonade_client_integration_example.py](lemonade_client_integration_example.py) + +A comprehensive example showing the complete workflow for integrating LemonadeClient into an application: + +- โœ… **Installation checking** - Verify lemonade-server is installed and compatible +- ๐Ÿš€ **Server management** - Start and verify server status +- ๐Ÿค– **Model operations** - Install, load, and verify model availability +- ๐Ÿงช **Inference testing** - Basic API connectivity and inference test +- ๐Ÿ“ **Error handling** - Proper exception handling and user feedback + +**Usage:** +```bash +python lemonade-arcade/examples/lemonade_client_integration_example.py +``` + +This example uses the `Qwen3-0.6B-GGUF` model as it's lightweight and good for testing. You can modify the `required_model` variable to use different models as needed. + +## Running Examples + +All examples are standalone Python scripts that can be run directly. They include proper error handling and progress reporting, making them suitable for: + +- Learning how to integrate LemonadeClient +- Testing your lemonade-server setup +- Starting point for your own applications +- CI/CD pipeline validation + +## Adding New Examples + +When adding new examples: + +1. Include comprehensive docstrings and comments +2. Add proper error handling with informative messages +3. Use realistic model names and parameters +4. Include progress reporting for long-running operations +5. Update this README with a description of the new example diff --git a/examples/lemonade_client_integration_example.py b/examples/lemonade_client_integration_example.py new file mode 100644 index 0000000..76cf1b3 --- /dev/null +++ b/examples/lemonade_client_integration_example.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +""" +LemonadeClient Integration Example + +Complete example of integrating LemonadeClient into an application. +""" + +import asyncio +from openai import AsyncOpenAI + +from lemonade_arcade.lemonade_client import LemonadeClient + + +# Run everything in an async context +async def main(): + client = LemonadeClient() + required_model = "Qwen3-0.6B-GGUF" + + # Check installation + version_info = await client.check_lemonade_server_version() + if not version_info["installed"] or not version_info["compatible"]: + print("Installing lemonade-server...") + result = await client.download_and_install_lemonade_server() + if not result["success"]: + raise Exception(f"Installation failed: {result['message']}") + client.refresh_environment() + client.reset_server_state() + + # Start server + if not await client.check_lemonade_server_running(): + print("Starting server...") + result = await client.start_lemonade_server() + if not result["success"]: + raise Exception(f"Server start failed: {result['message']}") + await asyncio.sleep(3) # Wait for startup + + # Verify API + if not await client.check_lemonade_server_api(): + raise Exception("Server API not responding") + + # Setup model + model_status = await client.check_model_installed(required_model) + if not model_status["installed"]: + print(f"Installing model {required_model}...") + result = await client.install_model(required_model) + if not result["success"]: + raise Exception(f"Model installation failed: {result['message']}") + + load_status = await client.check_model_loaded(required_model) + if not load_status["loaded"]: + print(f"Loading model {required_model}...") + result = await client.load_model(required_model) + if not result["success"]: + raise Exception(f"Model loading failed: {result['message']}") + + print(f"lemonade-server ready at {client.url}") + + # Make a chat completion request using OpenAI library + openai_client = AsyncOpenAI( + base_url=f"{client.url}/api/v1", + api_key="dummy", # lemonade-server doesn't require a real API key + ) + + response = await openai_client.chat.completions.create( + model=required_model, + messages=[ + {"role": "user", "content": "Hello! Please respond with 'Hello there!'"} + ], + max_tokens=50, + temperature=0.1, + ) + + print(f"Response: {response.choices[0].message.content}") + + +asyncio.run(main()) diff --git a/lemonade_arcade/lemonade_client.py b/lemonade_arcade/lemonade_client.py new file mode 100644 index 0000000..1cbeb9b --- /dev/null +++ b/lemonade_arcade/lemonade_client.py @@ -0,0 +1,917 @@ +import logging +import os +import subprocess +import sys +import tempfile +from typing import List +import httpx + + +LEMONADE_MINIMUM_VERSION = "8.1.5" + +logger = logging.getLogger("lemonade_arcade.main") + + +class LemonadeClient: + """ + Detect, install, and set up Lemonade Client. + This class makes it easier to start a new Python project with Lemonade + by automating many common tasks. + """ + + def __init__(self): + # Track which command is used for this server instance + self.server_command = None + # Track the server process to avoid starting multiple instances + self.server_process = None + + self.url = "http://localhost:8000" + + def is_pyinstaller_environment(self): + """ + Check if the application is running in a PyInstaller bundle environment. + + Use this when your app needs to determine installation method preferences + or adjust behavior based on deployment type. PyInstaller environments + typically prefer installer-based server installation over pip. + + Returns: + bool: True if running in PyInstaller bundle, False otherwise + """ + return getattr(sys, "frozen", False) + + def find_lemonade_server_paths(self): + """ + Find lemonade-server installation paths by scanning the system PATH. + + Use this to discover where lemonade-server binaries are installed on the system. + Helpful for apps that need to verify installation locations or debug path issues. + + Returns: + List[str]: List of directory paths containing lemonade-server installations + """ + paths = [] + + # Check current PATH for lemonade_server/bin directories + current_path = os.environ.get("PATH", "") + # Use the correct path separator for the platform + path_separator = ";" if sys.platform == "win32" else ":" + for path_entry in current_path.split(path_separator): + path_entry = path_entry.strip() + if "lemonade_server" in path_entry.lower() and "bin" in path_entry.lower(): + if os.path.exists(path_entry): + paths.append(path_entry) + logger.info(f"Found lemonade-server path in PATH: {path_entry}") + + return paths + + def reset_server_state(self): + """ + Reset cached server state after installation changes or configuration updates. + + Call this when you've installed/updated lemonade-server or changed system configuration + to ensure the client rediscovers server commands and processes. Essential after + installation operations to avoid using stale cached paths. + """ + + logger.info("Resetting server state") + self.server_command = None + if self.server_process and self.server_process.poll() is None: + try: + self.server_process.terminate() + except Exception: + pass + self.server_process = None + + def refresh_environment(self): + """ + Refresh environment variables from the system registry (Windows only). + + Use this after installing lemonade-server to pick up newly added PATH entries + without requiring an application restart. Essential for apps that install + lemonade-server programmatically and need immediate access to the commands. + """ + try: + if sys.platform == "win32": + # pylint: disable=import-error + # This will raise an import exception on linux right now + import winreg + + logger.info("Refreshing environment variables...") + + # Get system PATH + with winreg.OpenKey( + winreg.HKEY_LOCAL_MACHINE, + r"SYSTEM\CurrentControlSet\Control\Session Manager\Environment", + ) as key: + system_path = winreg.QueryValueEx(key, "PATH")[0] + + # Get user PATH + try: + with winreg.OpenKey( + winreg.HKEY_CURRENT_USER, r"Environment" + ) as key: + user_path = winreg.QueryValueEx(key, "PATH")[0] + except FileNotFoundError: + user_path = "" + + # Combine and update current process environment + new_path = system_path + if user_path: + new_path = user_path + ";" + system_path + + # Also add common Python Scripts directories that pip might use + python_scripts_paths = self._discover_python_scripts_paths() + + # Add these paths to the PATH if they're not already there + for scripts_path in python_scripts_paths: + if scripts_path.lower() not in new_path.lower(): + new_path = scripts_path + ";" + new_path + logger.info(f"Added {scripts_path} to PATH") + + os.environ["PATH"] = new_path + logger.info(f"Updated PATH: {new_path[:200]}...") # Log first 200 chars + else: + logger.info( + "Non-Windows platform, skipping registry-based PATH refresh" + ) + + except Exception as e: + logger.warning(f"Failed to refresh environment: {e}") + + def _discover_python_scripts_paths(self): + """Discover Python Scripts directories where pip installs console scripts.""" + python_scripts_paths = [] + + # Add Python Scripts directory (where pip installs console scripts) + python_base = os.path.dirname(sys.executable) + scripts_dir = os.path.join(python_base, "Scripts") + if os.path.exists(scripts_dir): + python_scripts_paths.append(scripts_dir) + logger.info(f"Found Python Scripts directory: {scripts_dir}") + + # Add user site-packages Scripts directory + try: + import site + + user_site = site.getusersitepackages() + if user_site: + user_scripts = os.path.join(os.path.dirname(user_site), "Scripts") + if os.path.exists(user_scripts): + python_scripts_paths.append(user_scripts) + logger.info(f"Found user Scripts directory: {user_scripts}") + except Exception: + pass + + return python_scripts_paths + + async def execute_lemonade_server_command( + self, + args: List[str], + timeout: int = 10, + use_popen: bool = False, + stdout_file=None, + stderr_file=None, + ): + """ + Execute lemonade-server commands using the best available method for the system. + + Use this as the primary interface for running any lemonade-server command. The method + automatically tries different installation methods (pip, installer, dev) and caches + the successful command for future use. Essential for cross-platform compatibility. + + Args: + args: Command arguments to pass to lemonade-server (e.g., ["--version"], ["serve"]) + timeout: Maximum seconds to wait for command completion + (ignored for background processes) + use_popen: True for background processes that shouldn't block, + False for commands with output + stdout_file: File handle to redirect standard output (only with use_popen=True) + stderr_file: File handle to redirect error output (only with use_popen=True) + + Returns: + subprocess.CompletedProcess for regular commands, subprocess.Popen for + background processes, or None if all command attempts failed + """ + logger.info(f"Executing lemonade-server command with args: {args}") + + # If we already know which command to use, use only that one + if self.server_command: + commands_to_try = [self.server_command + args] + else: + # Try different ways to find lemonade-server based on platform + commands_to_try = [] + + if sys.platform == "win32": + # Windows: Try traditional commands first, then Python module fallback + if not self.is_pyinstaller_environment(): + commands_to_try.append(["lemonade-server-dev"] + args) + + # Windows traditional commands + commands_to_try.extend( + [ + ["lemonade-server"] + args, + ["lemonade-server.bat"] + args, + ] + ) + + # Add dynamically discovered Windows paths + for bin_path in self.find_lemonade_server_paths(): + commands_to_try.extend( + [ + [os.path.join(bin_path, "lemonade-server.exe")] + args, + [os.path.join(bin_path, "lemonade-server.bat")] + args, + ] + ) + + # Python module fallback (most reliable after pip install) + commands_to_try.append([sys.executable, "-m", "lemonade_server"] + args) + else: + # Linux/Unix: Try lemonade-server-dev first, then Python module fallback + commands_to_try.append(["lemonade-server-dev"] + args) + commands_to_try.append([sys.executable, "-m", "lemonade_server"] + args) + + for i, cmd in enumerate(commands_to_try): + try: + logger.info(f"Trying command {i+1}: {cmd}") + + # Determine if we should use shell=True based on command type + use_shell = not (len(cmd) >= 3 and cmd[1] == "-m") + final_cmd = " ".join(cmd) if use_shell else cmd + + if use_popen: + # For background processes (like server start) + process = subprocess.Popen( + final_cmd, + stdout=stdout_file or subprocess.PIPE, + stderr=stderr_file or subprocess.PIPE, + creationflags=( + subprocess.CREATE_NO_WINDOW + if sys.platform == "win32" + else 0 + ), + shell=use_shell, + env=os.environ.copy(), + ) + + # Store the successful command for future use + if not self.server_command: + self.server_command = cmd[: -len(args)] + logger.info(f"Stored server command: {self.server_command}") + + return process + else: + # For regular commands with output + result = subprocess.run( + final_cmd, + capture_output=True, + text=True, + timeout=timeout, + shell=use_shell, + env=os.environ.copy(), + check=False, # Don't raise exception on non-zero exit + ) + + logger.debug(f"Command {i+1} returned code: {result.returncode}") + logger.debug(f"Command {i+1} stdout: '{result.stdout}'") + logger.debug(f"Command {i+1} stderr: '{result.stderr}'") + + if result.returncode == 0: + # Store the successful command for future use + if not self.server_command: + self.server_command = cmd[: -len(args)] + logger.info(f"Stored server command: {self.server_command}") + + return result + else: + logger.debug( + f"Command {i+1} failed with return code {result.returncode}" + ) + if result.stderr: + logger.debug(f"stderr: {result.stderr}") + # Try next command + continue + + except FileNotFoundError as e: + logger.debug(f"Command {i+1} not found: {e}") + continue + except subprocess.TimeoutExpired as e: + logger.debug(f"Command {i+1} timed out: {e}") + continue + except Exception as e: + logger.error(f"Unexpected error with command {i+1}: {e}") + continue + + # If we get here, all commands failed + logger.error("All lemonade-server commands failed") + return None + + async def check_lemonade_sdk_available(self): + """ + Check if the lemonade-sdk Python package is installed and importable. + + Use this to determine if pip-based installation is available before attempting + SDK-based operations. Helpful for showing installation options to users or + choosing between different installation methods. + + Returns: + bool: True if lemonade-sdk package can be imported, False otherwise + """ + logger.info("Checking for lemonade-sdk package...") + try: + # Handle Windows vs Unix path quoting differently + cmd = [sys.executable, "-c", "import lemonade_server; print('available')"] + + if sys.platform == "win32": + # On Windows, quote paths with spaces using double quotes + quoted_args = [] + for arg in cmd: + if " " in arg: + quoted_args.append(f'"{arg}"') + else: + quoted_args.append(arg) + cmd_str = " ".join(quoted_args) + else: + # On Unix systems, use shlex.quote + import shlex + + cmd_str = " ".join(shlex.quote(arg) for arg in cmd) + + logger.debug(f"Executing command: {cmd_str}") + result = subprocess.run( + cmd_str, + capture_output=True, + text=True, + timeout=10, + shell=True, # Keep shell=True for environment handling + check=False, # Don't raise exception on non-zero exit + ) + + logger.debug( + f"Command result: returncode={result.returncode}, " + f"stdout='{result.stdout.strip()}', stderr='{result.stderr.strip()}'" + ) + is_available = result.returncode == 0 and "available" in result.stdout + logger.info(f"lemonade-sdk package available: {is_available}") + return is_available + except Exception as e: + logger.info(f"lemonade-sdk package check failed: {e}") + return False + + async def check_lemonade_server_version(self): + """ + Check lemonade-server installation status and version compatibility. + + Use this to verify that lemonade-server is installed and meets minimum version + requirements before attempting to use server features. Essential for displaying + installation status and guiding users through setup. + + Returns: + dict: Contains 'installed' (bool), 'version' (str), 'compatible' (bool), + and 'required_version' (str) keys + """ + logger.info("Checking lemonade-server version...") + + result = await self.execute_lemonade_server_command(["--version"]) + + if result is None: + logger.error("All lemonade-server commands failed") + return { + "installed": False, + "version": None, + "compatible": False, + "required_version": LEMONADE_MINIMUM_VERSION, + } + + version_line = result.stdout.strip() + logger.info(f"Raw version output: '{version_line}'") + + # Extract version number (format might be "lemonade-server 8.1.3" or just "8.1.3") + import re + + version_match = re.search(r"(\d+\.\d+\.\d+)", version_line) + if version_match: + version = version_match.group(1) + logger.info(f"Extracted version: {version}") + + # Check if the version number is allowed + version_parts = [int(x) for x in version.split(".")] + required_parts = [int(x) for x in LEMONADE_MINIMUM_VERSION.split(".")] + is_compatible = version_parts >= required_parts + logger.info( + f"Version parts: {version_parts}, Required: {required_parts}, " + "Compatible: {is_compatible}" + ) + + return { + "installed": True, + "version": version, + "compatible": is_compatible, + "required_version": LEMONADE_MINIMUM_VERSION, + } + else: + logger.warning(f"Could not extract version from output: '{version_line}'") + return { + "installed": True, + "version": "unknown", + "compatible": False, + "required_version": LEMONADE_MINIMUM_VERSION, + } + + async def check_lemonade_server_running(self): + """ + Check if the lemonade-server process is currently running. + + Use this to determine server status before attempting operations that require + a running server. Helps decide whether to start the server or proceed with + API calls. + + Returns: + bool: True if server process is running, False otherwise + """ + logger.info("Checking if lemonade-server is running...") + + result = await self.execute_lemonade_server_command(["status"]) + + if result is None: + logger.error("All lemonade-server status commands failed") + return False + + output = result.stdout.strip() + logger.info(f"Status output: '{output}'") + if "Server is running" in output: + logger.info("Server is running according to status command") + return True + else: + logger.info("Server is not running according to status command") + return False + + async def start_lemonade_server(self): + """ + Start the lemonade-server process in the background. + + Use this to launch the server when it's not running and your app needs server + functionality. The server runs in a separate process and the method tracks the + process to avoid multiple instances. + + Returns: + dict: Contains 'success' (bool) and 'message' (str) keys indicating + whether the server started successfully + """ + logger.info("Attempting to start lemonade-server...") + + # Check if server is already running + if self.server_process and self.server_process.poll() is None: + logger.info("Server process is already running") + return {"success": True, "message": "Server is already running"} + + stdout_file = tempfile.NamedTemporaryFile( + mode="w+", delete=False, suffix=".log" + ) + stderr_file = tempfile.NamedTemporaryFile( + mode="w+", delete=False, suffix=".log" + ) + + # Use the unified function to start the server + process = await self.execute_lemonade_server_command( + ["serve"], use_popen=True, stdout_file=stdout_file, stderr_file=stderr_file + ) + + if process is None: + logger.error("All lemonade-server start commands failed") + stdout_file.close() + stderr_file.close() + try: + os.unlink(stdout_file.name) + os.unlink(stderr_file.name) + except Exception: + pass + return { + "success": False, + "message": "Failed to start server: all commands failed", + } + + # Give the process a moment to start and check if it's still running + import time + + time.sleep(1) + + # Check if process is still alive + if process.poll() is None: + logger.info(f"Successfully started lemonade-server with PID: {process.pid}") + self.server_process = process + + # Close temp files + stdout_file.close() + stderr_file.close() + + return {"success": True, "message": "Server start command issued"} + else: + # Process died immediately, check error output + stdout_file.close() + stderr_file.close() + + # Read the error output + try: + with open(stderr_file.name, "r", encoding="utf-8") as f: + stderr_content = f.read().strip() + with open(stdout_file.name, "r", encoding="utf-8") as f: + stdout_content = f.read().strip() + + logger.error( + f"Server failed immediately. Return code: {process.returncode}" + ) + if stderr_content: + logger.error(f"Stderr: {stderr_content}") + if stdout_content: + logger.info(f"Stdout: {stdout_content}") + + # Clean up temp files + try: + os.unlink(stdout_file.name) + os.unlink(stderr_file.name) + except Exception: + pass + + except Exception as read_error: + logger.error(f"Could not read process output: {read_error}") + + return {"success": False, "message": "Server process died immediately"} + + async def install_lemonade_sdk_package(self): + """ + Install the lemonade-sdk Python package using pip. + + Use this to install lemonade-server via pip when in development environments + or when the SDK approach is preferred. Provides access to lemonade-server-dev + command after successful installation. + + Returns: + dict: Contains 'success' (bool) and 'message' (str) keys indicating + installation result and any error details + """ + try: + logger.info("Installing lemonade-sdk package using pip...") + + # Install the package + result = subprocess.run( + [sys.executable, "-m", "pip", "install", "lemonade-sdk"], + capture_output=True, + text=True, + timeout=300, # 5 minutes timeout + check=True, + ) + + if result.returncode == 0: + logger.info("lemonade-sdk package installed successfully") + return { + "success": True, + "message": "lemonade-sdk package installed successfully. " + "You can now use 'lemonade-server-dev' command.", + } + else: + error_msg = ( + result.stderr or result.stdout or "Unknown installation error" + ) + logger.error(f"pip install failed: {error_msg}") + return {"success": False, "message": f"pip install failed: {error_msg}"} + + except Exception as e: + logger.error(f"Failed to install lemonade-sdk package: {e}") + return {"success": False, "message": f"Failed to install: {e}"} + + async def download_and_install_lemonade_server(self): + """ + Download and install lemonade-server using the best method for the environment. + + Use this as the primary installation method. Automatically chooses between pip + installation (development environments) or executable installer (PyInstaller bundles). + Handles the complete installation process including download and setup. + + Returns: + dict: Contains 'success' (bool), 'message' (str), and optionally 'interactive' (bool) + or 'github_link' (str) keys with installation results and next steps + """ + + # Reset server state since we're installing/updating + self.reset_server_state() + + # If not in PyInstaller environment, prefer pip installation + if not self.is_pyinstaller_environment(): + logger.info( + "Development environment detected, attempting pip installation first..." + ) + pip_result = await self.install_lemonade_sdk_package() + if pip_result["success"]: + return pip_result + else: + logger.info( + "pip installation failed, falling back to GitHub instructions..." + ) + return { + "success": False, + "message": "Could not install lemonade-sdk package. " + "Please visit https://github.com/lemonade-sdk/lemonade for " + "installation instructions.", + "github_link": "https://github.com/lemonade-sdk/lemonade", + } + + # PyInstaller environment or fallback - use installer for Windows + try: + # Download the installer + # pylint: disable=line-too-long + installer_url = "https://github.com/lemonade-sdk/lemonade/releases/latest/download/Lemonade_Server_Installer.exe" + + # Create temp directory for installer + temp_dir = tempfile.mkdtemp() + installer_path = os.path.join(temp_dir, "Lemonade_Server_Installer.exe") + + logger.info(f"Downloading installer from {installer_url}") + + # Download with progress tracking + async with httpx.AsyncClient( + timeout=300.0, follow_redirects=True + ) as client: + async with client.stream("GET", installer_url) as response: + if response.status_code != 200: + return { + "success": False, + "message": f"Failed to download installer: HTTP {response.status_code}", + } + + with open(installer_path, "wb") as f: + async for chunk in response.aiter_bytes(8192): + f.write(chunk) + + logger.info(f"Downloaded installer to {installer_path}") + + # Run interactive installation (not silent) + install_cmd = [installer_path] + + logger.info(f"Running interactive installation: {' '.join(install_cmd)}") + + # Start the installer but don't wait for it to complete + # This allows the user to see the installation UI + # pylint: disable=consider-using-with + subprocess.Popen(install_cmd) + + return { + "success": True, + "message": "Installer launched. Please complete the installation and then restart Lemonade Arcade.", + "interactive": True, + } + + except Exception as e: + logger.error(f"Failed to download/install lemonade-server: {e}") + return {"success": False, "message": f"Failed to install: {e}"} + + async def check_lemonade_server_api(self): + """ + Check if the lemonade-server API is responding to requests. + + Use this to verify that the server is not only running but also accepting + API connections. More reliable than process checks for determining if the + server is ready to handle requests. + + Returns: + bool: True if server API is responding, False otherwise + """ + logger.info(f"Checking Lemonade Server at {self.url}") + + # Try multiple times with increasing delays to give server time to start + for attempt in range(3): + try: + # Use a longer timeout and retry logic for more robust checking + async with httpx.AsyncClient(timeout=15.0) as client: + response = await client.get(f"{self.url}/api/v1/models") + logger.info( + f"Server check attempt {attempt + 1} response status: " + f"{response.status_code}" + ) + if response.status_code == 200: + return True + elif response.status_code == 404: + # Try the health endpoint if models endpoint doesn't exist + logger.info("Models endpoint not found, trying health endpoint") + try: + health_response = await client.get(f"{self.url}/health") + logger.info( + f"Health check response status: {health_response.status_code}" + ) + return health_response.status_code == 200 + except Exception as e: + logger.info(f"Health check failed: {e}") + + except httpx.TimeoutException: + logger.info( + f"Server check attempt {attempt + 1} timed out - server might be starting up" + ) + except httpx.ConnectError as e: + logger.info( + f"Server check attempt {attempt + 1} connection failed: {e}" + ) + except Exception as e: + logger.info(f"Server check attempt {attempt + 1} failed: {e}") + + # Wait before next attempt (except on last attempt) + if attempt < 2: + import asyncio + + await asyncio.sleep(2) + + logger.info("All server check attempts failed") + return False + + async def get_available_models(self): + """ + Retrieve the list of models available on the lemonade-server. + + Use this to discover which models are installed and available for use. + Helpful for displaying model options to users or verifying that required + models are available before attempting to use them. + + Returns: + List[str]: List of model names/IDs available on the server, empty list if none found + """ + logger.info("Getting available models from Lemonade Server") + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.get(f"{self.url}/api/v1/models") + if response.status_code == 200: + data = response.json() + models = [model["id"] for model in data.get("data", [])] + logger.info(f"Found {len(models)} available models: {models}") + return models + else: + logger.warning( + f"Failed to get models, status: {response.status_code}" + ) + except Exception as e: + logger.info(f"Error getting models: {e}") + return [] + + async def check_model_installed(self, model): + """ + Check if a specific model is installed on the server. + + Use this to verify model availability before attempting to load or use a model. + Essential for apps that depend on specific models to function properly. + + Args: + model: The model name/ID to check for (e.g., "Qwen3-Coder-30B-A3B-Instruct-GGUF") + + Returns: + dict: Contains 'installed' (bool) and 'model_name' (str) keys + """ + logger.info(f"Checking for required model: {model}") + + try: + models = await self.get_available_models() + is_installed = model in models + logger.info(f"Required model installed: {is_installed}") + return {"installed": is_installed, "model_name": model} + except Exception as e: + logger.error(f"Error checking required model: {e}") + return {"installed": False, "model_name": model} + + async def check_model_loaded(self, model): + """ + Check if a specific model is currently loaded and ready for inference. + + Use this to verify that a model is loaded before making inference requests. + Models must be loaded before they can be used for chat completions or other + inference operations. + + Args: + model: The model name/ID to check (e.g., "Qwen3-Coder-30B-A3B-Instruct-GGUF") + + Returns: + dict: Contains 'loaded' (bool), 'model_name' (str), and 'current_model' (str) keys + """ + logger.info(f"Checking if model is loaded: {model}") + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(f"{self.url}/api/v1/health") + + if response.status_code == 200: + status_data = response.json() + # Check if the required model is the currently loaded model + loaded_model = status_data.get("model_loaded", "") + is_loaded = loaded_model == model + logger.info( + f"Model loaded status: {is_loaded}, current model: {loaded_model}" + ) + return { + "loaded": is_loaded, + "model_name": model, + "current_model": loaded_model, + } + else: + logger.warning( + f"Failed to get server status: HTTP {response.status_code}" + ) + return { + "loaded": False, + "model_name": model, + "current_model": None, + } + except Exception as e: + logger.error(f"Error checking model loaded status: {e}") + return { + "loaded": False, + "model_name": model, + "current_model": None, + } + + async def install_model(self, model): + """ + Download and install a model on the lemonade-server. + + Use this to install models that your app requires but aren't currently available + on the server. The installation process may take several minutes for large models + and requires an active internet connection. + + Args: + model: The model name/ID to install (e.g., "Qwen3-Coder-30B-A3B-Instruct-GGUF") + + Returns: + dict: Contains 'success' (bool) and 'message' (str) keys indicating + installation result and any error details + """ + logger.info(f"Installing model: {model}") + + try: + async with httpx.AsyncClient( + timeout=600.0 + ) as client: # 10 minute timeout for model download + response = await client.post( + f"{self.url}/api/v1/pull", + json={"model_name": model}, + headers={"Content-Type": "application/json"}, + ) + + if response.status_code == 200: + logger.info(f"Successfully installed model: {model}") + return { + "success": True, + "message": f"Model {model} installed successfully", + } + else: + error_msg = f"Failed to install model: HTTP {response.status_code}" + logger.error(error_msg) + return {"success": False, "message": error_msg} + except httpx.TimeoutException: + error_msg = "Model installation timed out - this is a large model and may take longer" + logger.warning(error_msg) + return {"success": False, "message": error_msg} + except Exception as e: + error_msg = f"Error installing model: {e}" + logger.error(error_msg) + return {"success": False, "message": error_msg} + + async def load_model(self, model): + """ + Load a model into memory for inference operations. + + Use this to prepare an installed model for use. Models must be loaded before + they can handle chat completions or other inference requests. Only one model + can be loaded at a time. + + Args: + model: The model name/ID to load (e.g., "Qwen3-Coder-30B-A3B-Instruct-GGUF") + + Returns: + dict: Contains 'success' (bool) and 'message' (str) keys indicating + whether the model loaded successfully + """ + logger.info(f"Loading model: {model}") + + try: + async with httpx.AsyncClient( + timeout=600.0 + ) as client: # 10 minute timeout for model loading + response = await client.post( + f"{self.url}/api/v1/load", + json={"model_name": model}, + headers={"Content-Type": "application/json"}, + ) + + if response.status_code == 200: + logger.info(f"Successfully loaded model: {model}") + return { + "success": True, + "message": f"Model {model} loaded successfully", + } + else: + error_msg = f"Failed to load model: HTTP {response.status_code}" + logger.error(error_msg) + return {"success": False, "message": error_msg} + except httpx.TimeoutException: + error_msg = "Model loading timed out" + logger.warning(error_msg) + return {"success": False, "message": error_msg} + except Exception as e: + error_msg = f"Error loading model: {e}" + logger.error(error_msg) + return {"success": False, "message": error_msg} diff --git a/lemonade_arcade/main.py b/lemonade_arcade/main.py index 9278542..72c347b 100644 --- a/lemonade_arcade/main.py +++ b/lemonade_arcade/main.py @@ -9,11 +9,10 @@ import re import subprocess import sys -import tempfile import time import uuid from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, Optional import httpx import uvicorn @@ -26,10 +25,13 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates +import lemonade_arcade.lemonade_client as lc + +lemonade_handle = lc.LemonadeClient() -LEMONADE_VERSION = "8.1.5" # Pygame will be imported on-demand to avoid early DLL loading issues +# pylint: disable=invalid-name pygame = None if os.environ.get("LEMONADE_ARCADE_MODEL"): @@ -43,11 +45,16 @@ ) logger = logging.getLogger("lemonade_arcade.main") +# Suppress noisy httpcore debug messages +logging.getLogger("httpcore").setLevel(logging.WARNING) +logging.getLogger("httpx").setLevel(logging.WARNING) + def get_resource_path(relative_path): """Get absolute path to resource, works for dev and for PyInstaller""" try: # PyInstaller creates a temp folder and stores path in _MEIPASS + # pylint: disable=protected-access,no-member base_path = sys._MEIPASS # In PyInstaller bundle, resources are under lemonade_arcade/ if relative_path in ["static", "templates", "builtin_games"]: @@ -69,732 +76,131 @@ def get_resource_path(relative_path): app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) -# Global state -LEMONADE_SERVER_URL = "http://localhost:8000" -GAMES_DIR = Path.home() / ".lemonade-arcade" / "games" -RUNNING_GAMES: Dict[str, subprocess.Popen] = {} -GAME_METADATA: Dict[str, Dict] = {} - -# Server management state -SERVER_COMMAND = None # Track which command is used for this server instance -SERVER_PROCESS = None # Track the server process to avoid starting multiple instances - -# Ensure games directory exists -GAMES_DIR.mkdir(parents=True, exist_ok=True) - -# Load existing game metadata -METADATA_FILE = GAMES_DIR / "metadata.json" -if METADATA_FILE.exists(): - try: - with open(METADATA_FILE, "r") as f: - GAME_METADATA = json.load(f) - # Clean up old metadata format - remove descriptions - updated = False - for game_id, game_data in GAME_METADATA.items(): - if "description" in game_data: - del game_data["description"] - updated = True - # Save if we made changes - if updated: - with open(METADATA_FILE, "w") as f: - json.dump(GAME_METADATA, f, indent=2) - except Exception: - GAME_METADATA = {} - - -# Built-in games configuration -BUILTIN_GAMES = { - "builtin_snake": { - "title": "Dynamic Snake", - "created": 0, # Special marker for built-in games - "prompt": "Snake but the food moves around", - "builtin": True, - "file": "snake_moving_food.py", - }, - "builtin_invaders": { - "title": "Rainbow Space Invaders", - "created": 0, # Special marker for built-in games - "prompt": "Space invaders with rainbow colors", - "builtin": True, - "file": "rainbow_space_invaders.py", - }, -} - -# Add built-in games to metadata if not already present -for game_id, game_data in BUILTIN_GAMES.items(): - if game_id not in GAME_METADATA: - GAME_METADATA[game_id] = game_data.copy() - - -def save_metadata(): - """Save game metadata to disk.""" - try: - with open(METADATA_FILE, "w") as f: - json.dump(GAME_METADATA, f, indent=2) - except Exception as e: - print(f"Error saving metadata: {e}") - - -def is_pyinstaller_environment(): - """Check if we're running in a PyInstaller bundle.""" - return getattr(sys, "frozen", False) - - -def find_lemonade_server_paths(): - """Find actual lemonade-server installation paths by checking the environment.""" - paths = [] - - # Check current PATH for lemonade_server/bin directories - current_path = os.environ.get("PATH", "") - # Use the correct path separator for the platform - path_separator = ";" if sys.platform == "win32" else ":" - for path_entry in current_path.split(path_separator): - path_entry = path_entry.strip() - if "lemonade_server" in path_entry.lower() and "bin" in path_entry.lower(): - if os.path.exists(path_entry): - paths.append(path_entry) - logger.info(f"Found lemonade-server path in PATH: {path_entry}") - - return paths - - -def reset_server_state(): - """Reset server state when installation changes.""" - global SERVER_COMMAND, SERVER_PROCESS - logger.info("Resetting server state") - SERVER_COMMAND = None - if SERVER_PROCESS and SERVER_PROCESS.poll() is None: - try: - SERVER_PROCESS.terminate() - except: - pass - SERVER_PROCESS = None - - -def refresh_environment(): - """Refresh the current process environment variables from the system.""" - try: - import winreg - import subprocess - - logger.info("Refreshing environment variables...") - - # Get system PATH - with winreg.OpenKey( - winreg.HKEY_LOCAL_MACHINE, - r"SYSTEM\CurrentControlSet\Control\Session Manager\Environment", - ) as key: - system_path = winreg.QueryValueEx(key, "PATH")[0] - - # Get user PATH - try: - with winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Environment") as key: - user_path = winreg.QueryValueEx(key, "PATH")[0] - except FileNotFoundError: - user_path = "" - - # Combine and update current process environment - new_path = system_path - if user_path: - new_path = user_path + ";" + system_path - - os.environ["PATH"] = new_path - logger.info(f"Updated PATH: {new_path[:200]}...") # Log first 200 chars - - except Exception as e: - logger.warning(f"Failed to refresh environment: {e}") - -async def execute_lemonade_server_command( - args: List[str], - timeout: int = 10, - use_popen: bool = False, - stdout_file=None, - stderr_file=None, -): +class ArcadeGames: """ - Execute a lemonade-server command with the appropriate binary/method. - - Args: - args: Command arguments (e.g., ["--version"], ["status"], ["serve"]) - timeout: Timeout in seconds for subprocess.run (ignored for Popen) - use_popen: If True, use Popen for background processes, otherwise use run - stdout_file: File object for stdout (only used with use_popen=True) - stderr_file: File object for stderr (only used with use_popen=True) - - Returns: - For subprocess.run: subprocess.CompletedProcess - For subprocess.Popen: subprocess.Popen instance - Returns None if all commands failed + Keep track of the state of saved and running games. """ - global SERVER_COMMAND - logger.info(f"Executing lemonade-server command with args: {args}") - - # If we already know which command to use, use only that one - if SERVER_COMMAND: - commands_to_try = [SERVER_COMMAND + args] - else: - # Try different ways to find lemonade-server based on platform - commands_to_try = [] - - if sys.platform == "win32": - # Windows: Try multiple options including PyInstaller and pip installs - if not is_pyinstaller_environment(): - commands_to_try.append(["lemonade-server-dev"] + args) - - # Windows traditional commands - commands_to_try.extend( - [ - ["lemonade-server"] + args, - ["lemonade-server.bat"] + args, - ] - ) - - # Add dynamically discovered Windows paths - for bin_path in find_lemonade_server_paths(): - commands_to_try.extend( - [ - [os.path.join(bin_path, "lemonade-server.exe")] + args, - [os.path.join(bin_path, "lemonade-server.bat")] + args, - ] - ) - else: - # Linux/Unix: Only lemonade-server-dev works (from pip install lemonade-sdk) - commands_to_try.append(["lemonade-server-dev"] + args) - - for i, cmd in enumerate(commands_to_try): - try: - logger.info(f"Trying command {i+1}: {cmd}") - - if use_popen: - # For background processes (like server start) - # Convert command list to string for shell=True - cmd_str = " ".join(cmd) - process = subprocess.Popen( - cmd_str, - stdout=stdout_file or subprocess.PIPE, - stderr=stderr_file or subprocess.PIPE, - creationflags=( - subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 - ), - shell=True, # Use shell=True to help with PATH resolution - env=os.environ.copy(), # Pass current environment - ) - - # Store the successful command for future use - if not SERVER_COMMAND: - SERVER_COMMAND = cmd[ - : -len(args) - ] # Remove the args to get base command - logger.info(f"Stored server command: {SERVER_COMMAND}") - - return process - else: - # For regular commands with output - # Convert command list to string for shell=True - cmd_str = " ".join(cmd) - result = subprocess.run( - cmd_str, - capture_output=True, - text=True, - timeout=timeout, - shell=True, # Use shell=True to help with PATH resolution - env=os.environ.copy(), # Pass current environment - ) - logger.info(f"Command {i+1} returned code: {result.returncode}") - logger.info(f"Command {i+1} stdout: '{result.stdout}'") - logger.info(f"Command {i+1} stderr: '{result.stderr}'") - - if result.returncode == 0: - # Store the successful command for future use - if not SERVER_COMMAND: - SERVER_COMMAND = cmd[ - : -len(args) - ] # Remove the args to get base command - logger.info(f"Stored server command: {SERVER_COMMAND}") - - return result - else: - logger.warning( - f"Command {i+1} failed with return code {result.returncode}" - ) - if result.stderr: - logger.warning(f"stderr: {result.stderr}") - # Try next command - continue - - except FileNotFoundError as e: - logger.info(f"Command {i+1} not found: {e}") - continue - except subprocess.TimeoutExpired as e: - logger.warning(f"Command {i+1} timed out: {e}") - continue - except Exception as e: - logger.error(f"Unexpected error with command {i+1}: {e}") - continue - - # If we get here, all commands failed - logger.error("All lemonade-server commands failed") - return None - - -async def check_lemonade_sdk_available(): - """Check if lemonade-sdk package is available in the current environment.""" - logger.info("Checking for lemonade-sdk package...") - try: - # Convert command list to string for shell=True - cmd = [sys.executable, "-c", "import lemonade_server; print('available')"] - cmd_str = " ".join(cmd) - result = subprocess.run( - cmd_str, - capture_output=True, - text=True, - timeout=10, - shell=True, # Use shell=True to get updated environment after pip install - ) - is_available = result.returncode == 0 and "available" in result.stdout - logger.info(f"lemonade-sdk package available: {is_available}") - return is_available - except Exception as e: - logger.info(f"lemonade-sdk package check failed: {e}") - return False - - -async def check_lemonade_server_version(): - """Check if lemonade-server is installed and get its version.""" - logger.info("Checking lemonade-server version...") - - result = await execute_lemonade_server_command(["--version"]) - - if result is None: - logger.error("All lemonade-server commands failed") - return { - "installed": False, - "version": None, - "compatible": False, - "required_version": LEMONADE_VERSION, - } - - version_line = result.stdout.strip() - logger.info(f"Raw version output: '{version_line}'") - - # Extract version number (format might be "lemonade-server 8.1.3" or just "8.1.3") - import re - - version_match = re.search(r"(\d+\.\d+\.\d+)", version_line) - if version_match: - version = version_match.group(1) - logger.info(f"Extracted version: {version}") - - # Check if the version number is allowed - version_parts = [int(x) for x in version.split(".")] - required_parts = [int(x) for x in LEMONADE_VERSION.split(".")] - is_compatible = version_parts >= required_parts - logger.info( - f"Version parts: {version_parts}, Required: {required_parts}, Compatible: {is_compatible}" - ) - - return { - "installed": True, - "version": version, - "compatible": is_compatible, - "required_version": LEMONADE_VERSION, - } - else: - logger.warning(f"Could not extract version from output: '{version_line}'") - return { - "installed": True, - "version": "unknown", - "compatible": False, - "required_version": LEMONADE_VERSION, - } - - -async def check_lemonade_server_running(): - """Check if lemonade-server is currently running.""" - logger.info("Checking if lemonade-server is running...") - - result = await execute_lemonade_server_command(["status"]) - - if result is None: - logger.error("All lemonade-server status commands failed") - return False - - output = result.stdout.strip() - logger.info(f"Status output: '{output}'") - if "Server is running" in output: - logger.info("Server is running according to status command") - return True - else: - logger.info("Server is not running according to status command") - return False - - -async def start_lemonade_server(): - """Start lemonade-server in the background.""" - global SERVER_PROCESS - logger.info("Attempting to start lemonade-server...") - - # Check if server is already running - if SERVER_PROCESS and SERVER_PROCESS.poll() is None: - logger.info("Server process is already running") - return {"success": True, "message": "Server is already running"} - - # Create temp files to capture output for debugging - import tempfile - stdout_file = tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".log") - stderr_file = tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".log") + def __init__(self): - # Use the unified function to start the server - process = await execute_lemonade_server_command( - ["serve"], use_popen=True, stdout_file=stdout_file, stderr_file=stderr_file - ) - - if process is None: - logger.error("All lemonade-server start commands failed") - stdout_file.close() - stderr_file.close() - try: - os.unlink(stdout_file.name) - os.unlink(stderr_file.name) - except: - pass - return { - "success": False, - "message": "Failed to start server: all commands failed", - } - - # Give the process a moment to start and check if it's still running - import time - - time.sleep(1) + # Global state + self.games_dir = Path.home() / ".lemonade-arcade" / "games" + self.running_games: Dict[str, subprocess.Popen] = {} + self.game_metadata: Dict[str, Dict] = {} - # Check if process is still alive - if process.poll() is None: - logger.info(f"Successfully started lemonade-server with PID: {process.pid}") - SERVER_PROCESS = process - - # Close temp files - stdout_file.close() - stderr_file.close() - - return {"success": True, "message": "Server start command issued"} - else: - # Process died immediately, check error output - stdout_file.close() - stderr_file.close() - - # Read the error output - try: - with open(stderr_file.name, "r") as f: - stderr_content = f.read().strip() - with open(stdout_file.name, "r") as f: - stdout_content = f.read().strip() - - logger.error( - f"Server failed immediately. Return code: {process.returncode}" - ) - if stderr_content: - logger.error(f"Stderr: {stderr_content}") - if stdout_content: - logger.info(f"Stdout: {stdout_content}") + # Ensure games directory exists + self.games_dir.mkdir(parents=True, exist_ok=True) - # Clean up temp files + # Load existing game metadata + self.metadata_file = self.games_dir / "metadata.json" + if self.metadata_file.exists(): try: - os.unlink(stdout_file.name) - os.unlink(stderr_file.name) - except: - pass - - except Exception as read_error: - logger.error(f"Could not read process output: {read_error}") - - return {"success": False, "message": "Server process died immediately"} - - -async def install_lemonade_sdk_package(): - """Install lemonade-sdk package using pip.""" - try: - logger.info("Installing lemonade-sdk package using pip...") - - # Install the package - result = subprocess.run( - [sys.executable, "-m", "pip", "install", "lemonade-sdk"], - capture_output=True, - text=True, - timeout=300, # 5 minutes timeout - ) - - if result.returncode == 0: - logger.info("lemonade-sdk package installed successfully") - return { - "success": True, - "message": "lemonade-sdk package installed successfully. You can now use 'lemonade-server-dev' command.", - } - else: - error_msg = result.stderr or result.stdout or "Unknown installation error" - logger.error(f"pip install failed: {error_msg}") - return {"success": False, "message": f"pip install failed: {error_msg}"} - - except Exception as e: - logger.error(f"Failed to install lemonade-sdk package: {e}") - return {"success": False, "message": f"Failed to install: {e}"} - - -async def download_and_install_lemonade_server(): - """Download and install lemonade-server using the appropriate method.""" - - # Reset server state since we're installing/updating - reset_server_state() - - # If not in PyInstaller environment, prefer pip installation - if not is_pyinstaller_environment(): - logger.info( - "Development environment detected, attempting pip installation first..." - ) - pip_result = await install_lemonade_sdk_package() - if pip_result["success"]: - return pip_result - else: - logger.info( - "pip installation failed, falling back to GitHub instructions..." - ) - return { - "success": False, - "message": "Could not install lemonade-sdk package. Please visit https://github.com/lemonade-sdk/lemonade for installation instructions.", - "github_link": "https://github.com/lemonade-sdk/lemonade", - } - - # PyInstaller environment or fallback - use installer for Windows - try: - # Download the installer - installer_url = "https://github.com/lemonade-sdk/lemonade/releases/latest/download/Lemonade_Server_Installer.exe" - - # Create temp directory for installer - temp_dir = tempfile.mkdtemp() - installer_path = os.path.join(temp_dir, "Lemonade_Server_Installer.exe") - - logger.info(f"Downloading installer from {installer_url}") - - # Download with progress tracking - async with httpx.AsyncClient(timeout=300.0, follow_redirects=True) as client: - async with client.stream("GET", installer_url) as response: - if response.status_code != 200: - return { - "success": False, - "message": f"Failed to download installer: HTTP {response.status_code}", - } - - with open(installer_path, "wb") as f: - async for chunk in response.aiter_bytes(8192): - f.write(chunk) - - logger.info(f"Downloaded installer to {installer_path}") - - # Run interactive installation (not silent) - install_cmd = [installer_path] - - logger.info(f"Running interactive installation: {' '.join(install_cmd)}") - - # Start the installer but don't wait for it to complete - # This allows the user to see the installation UI - process = subprocess.Popen(install_cmd) - - return { - "success": True, - "message": "Installer launched. Please complete the installation and then restart Lemonade Arcade.", - "interactive": True, + with open(self.metadata_file, "r", encoding="utf-8") as metadata_file: + self.game_metadata = json.load(metadata_file) + except Exception: + self.game_metadata = {} + + # Built-in games configuration + self.BUILTIN_GAMES = { + "builtin_snake": { + "title": "Dynamic Snake", + "created": 0, # Special marker for built-in games + "prompt": "Snake but the food moves around", + "builtin": True, + "file": "snake_moving_food.py", + }, + "builtin_invaders": { + "title": "Rainbow Space Invaders", + "created": 0, # Special marker for built-in games + "prompt": "Space invaders with rainbow colors", + "builtin": True, + "file": "rainbow_space_invaders.py", + }, } - except Exception as e: - logger.error(f"Failed to download/install lemonade-server: {e}") - return {"success": False, "message": f"Failed to install: {e}"} - - -async def check_lemonade_server(): - """Check if Lemonade Server is running.""" - logger.info(f"Checking Lemonade Server at {LEMONADE_SERVER_URL}") + # Add built-in games to metadata if not already present + for game_id, game_data in self.BUILTIN_GAMES.items(): + if game_id not in self.game_metadata: + self.game_metadata[game_id] = game_data.copy() - # Try multiple times with increasing delays to give server time to start - for attempt in range(3): + def save_metadata(self): + """Save game metadata to disk.""" try: - # Use a longer timeout and retry logic for more robust checking - async with httpx.AsyncClient(timeout=15.0) as client: - response = await client.get(f"{LEMONADE_SERVER_URL}/api/v1/models") - logger.info( - f"Server check attempt {attempt + 1} response status: {response.status_code}" - ) - if response.status_code == 200: - return True - elif response.status_code == 404: - # Try the health endpoint if models endpoint doesn't exist - logger.info("Models endpoint not found, trying health endpoint") - try: - health_response = await client.get( - f"{LEMONADE_SERVER_URL}/health" - ) - logger.info( - f"Health check response status: {health_response.status_code}" - ) - return health_response.status_code == 200 - except Exception as e: - logger.info(f"Health check failed: {e}") - - except httpx.TimeoutException: - logger.info( - f"Server check attempt {attempt + 1} timed out - server might be starting up" - ) - except httpx.ConnectError as e: - logger.info(f"Server check attempt {attempt + 1} connection failed: {e}") + with open(self.metadata_file, "w", encoding="utf-8") as f: + json.dump(self.game_metadata, f, indent=2) except Exception as e: - logger.info(f"Server check attempt {attempt + 1} failed: {e}") - - # Wait before next attempt (except on last attempt) - if attempt < 2: - import asyncio - - await asyncio.sleep(2) - - logger.info("All server check attempts failed") - return False - - -async def get_available_models(): - """Get list of available models from Lemonade Server.""" - logger.info("Getting available models from Lemonade Server") - try: - async with httpx.AsyncClient(timeout=10.0) as client: - response = await client.get(f"{LEMONADE_SERVER_URL}/api/v1/models") - if response.status_code == 200: - data = response.json() - models = [model["id"] for model in data.get("data", [])] - logger.info(f"Found {len(models)} available models: {models}") - return models - else: - logger.warning(f"Failed to get models, status: {response.status_code}") - except Exception as e: - logger.info(f"Error getting models: {e}") - return [] - - -async def check_required_model(): - """Check if the required model is installed.""" - logger.info(f"Checking for required model: {REQUIRED_MODEL}") - - try: - models = await get_available_models() - is_installed = REQUIRED_MODEL in models - logger.info(f"Required model installed: {is_installed}") - return {"installed": is_installed, "model_name": REQUIRED_MODEL} - except Exception as e: - logger.error(f"Error checking required model: {e}") - return {"installed": False, "model_name": REQUIRED_MODEL} - - -async def check_model_loaded(): - """Check if the required model is currently loaded.""" - logger.info(f"Checking if model is loaded: {REQUIRED_MODEL}") + print(f"Error saving metadata: {e}") + + def launch_game(self, game_id: str): + """Launch a game in a separate process.""" + logger.debug(f"Attempting to launch game {game_id}") + + # Check if it's a built-in game + if game_id in self.BUILTIN_GAMES: + # For built-in games, use the file from the builtin_games directory + builtin_games_dir = get_resource_path("builtin_games") + game_file = Path(builtin_games_dir) / self.BUILTIN_GAMES[game_id]["file"] + logger.debug(f"Looking for built-in game file at: {game_file}") + else: + # For user-generated games, use the standard games directory + game_file = self.games_dir / f"{game_id}.py" + logger.debug(f"Looking for user game file at: {game_file}") - try: - async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.get(f"{LEMONADE_SERVER_URL}/api/v1/health") + if not game_file.exists(): + logger.error(f"Game file not found: {game_file}") + raise FileNotFoundError(f"Game file not found: {game_file}") - if response.status_code == 200: - status_data = response.json() - # Check if the required model is the currently loaded model - loaded_model = status_data.get("model_loaded", "") - is_loaded = loaded_model == REQUIRED_MODEL - logger.info( - f"Model loaded status: {is_loaded}, current model: {loaded_model}" - ) - return { - "loaded": is_loaded, - "model_name": REQUIRED_MODEL, - "current_model": loaded_model, - } + # Launch the game + try: + # In PyInstaller environment, use the same executable with the game file as argument + # This ensures the game runs with the same DLL configuration + if getattr(sys, "frozen", False): + # We're in PyInstaller - use the same executable that has the SDL2 DLLs + cmd = [sys.executable, str(game_file)] + logger.debug(f"PyInstaller mode - Launching: {' '.join(cmd)}") else: - logger.warning( - f"Failed to get server status: HTTP {response.status_code}" - ) - return { - "loaded": False, - "model_name": REQUIRED_MODEL, - "current_model": None, - } - except Exception as e: - logger.error(f"Error checking model loaded status: {e}") - return {"loaded": False, "model_name": REQUIRED_MODEL, "current_model": None} - - -async def install_required_model(): - """Install the required model using the pull endpoint.""" - logger.info(f"Installing required model: {REQUIRED_MODEL}") - - try: - async with httpx.AsyncClient( - timeout=600.0 - ) as client: # 10 minute timeout for model download - response = await client.post( - f"{LEMONADE_SERVER_URL}/api/v1/pull", - json={"model_name": REQUIRED_MODEL}, - headers={"Content-Type": "application/json"}, - ) + # Development mode - use regular Python + cmd = [sys.executable, str(game_file)] + logger.debug(f"Development mode - Launching: {' '.join(cmd)}") + + # pylint: disable=consider-using-with + process = subprocess.Popen(cmd) + self.running_games[game_id] = process + logger.debug(f"Game {game_id} launched successfully with PID {process.pid}") + return True + except Exception as e: + logger.error(f"Error launching game {game_id}: {e}") + return False - if response.status_code == 200: - logger.info(f"Successfully installed model: {REQUIRED_MODEL}") - return { - "success": True, - "message": f"Model {REQUIRED_MODEL} installed successfully", - } - else: - error_msg = f"Failed to install model: HTTP {response.status_code}" - logger.error(error_msg) - return {"success": False, "message": error_msg} - except httpx.TimeoutException: - error_msg = ( - "Model installation timed out - this is a large model and may take longer" - ) - logger.warning(error_msg) - return {"success": False, "message": error_msg} - except Exception as e: - error_msg = f"Error installing model: {e}" - logger.error(error_msg) - return {"success": False, "message": error_msg} + def stop_game(self, game_id: str): + """Stop a running game.""" + if game_id in self.running_games: + try: + process = self.running_games[game_id] + process.terminate() + # Wait a bit for graceful termination + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + except Exception as e: + print(f"Error stopping game {game_id}: {e}") + finally: + del self.running_games[game_id] + def cleanup_finished_games(self): + """Clean up finished game processes.""" + finished = [] + for game_id, process in self.running_games.items(): + if process.poll() is not None: # Process has finished + finished.append(game_id) -async def load_required_model(): - """Load the required model using the load endpoint.""" - logger.info(f"Loading required model: {REQUIRED_MODEL}") + for game_id in finished: + del self.running_games[game_id] - try: - async with httpx.AsyncClient( - timeout=600.0 - ) as client: # 10 minute timeout for model loading - response = await client.post( - f"{LEMONADE_SERVER_URL}/api/v1/load", - json={"model_name": REQUIRED_MODEL}, - headers={"Content-Type": "application/json"}, - ) - if response.status_code == 200: - logger.info(f"Successfully loaded model: {REQUIRED_MODEL}") - return { - "success": True, - "message": f"Model {REQUIRED_MODEL} loaded successfully", - } - else: - error_msg = f"Failed to load model: HTTP {response.status_code}" - logger.error(error_msg) - return {"success": False, "message": error_msg} - except httpx.TimeoutException: - error_msg = "Model loading timed out" - logger.warning(error_msg) - return {"success": False, "message": error_msg} - except Exception as e: - error_msg = f"Error loading model: {e}" - logger.error(error_msg) - return {"success": False, "message": error_msg} +arcade_games = ArcadeGames() async def generate_game_title(prompt: str) -> str: @@ -802,6 +208,7 @@ async def generate_game_title(prompt: str) -> str: logger.debug(f"Generating title for prompt: {prompt[:50]}...") try: + # pylint: disable=line-too-long title_prompt = f"""Generate a short game title (2-3 words maximum) for this game concept: "{prompt}" Requirements: @@ -822,7 +229,7 @@ async def generate_game_title(prompt: str) -> str: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( - f"{LEMONADE_SERVER_URL}/api/v1/chat/completions", + f"{lemonade_handle.url}/api/v1/chat/completions", json={ "model": REQUIRED_MODEL, "messages": messages, @@ -887,75 +294,6 @@ def generate_game_id(): return str(uuid.uuid4())[:8] -def launch_game(game_id: str): - """Launch a game in a separate process.""" - logger.debug(f"Attempting to launch game {game_id}") - - # Check if it's a built-in game - if game_id in BUILTIN_GAMES: - # For built-in games, use the file from the builtin_games directory - builtin_games_dir = get_resource_path("builtin_games") - game_file = Path(builtin_games_dir) / BUILTIN_GAMES[game_id]["file"] - logger.debug(f"Looking for built-in game file at: {game_file}") - else: - # For user-generated games, use the standard games directory - game_file = GAMES_DIR / f"{game_id}.py" - logger.debug(f"Looking for user game file at: {game_file}") - - if not game_file.exists(): - logger.error(f"Game file not found: {game_file}") - raise FileNotFoundError(f"Game file not found: {game_file}") - - # Launch the game - try: - # In PyInstaller environment, use the same executable with the game file as argument - # This ensures the game runs with the same DLL configuration - if getattr(sys, "frozen", False): - # We're in PyInstaller - use the same executable that has the SDL2 DLLs - cmd = [sys.executable, str(game_file)] - logger.debug(f"PyInstaller mode - Launching: {' '.join(cmd)}") - else: - # Development mode - use regular Python - cmd = [sys.executable, str(game_file)] - logger.debug(f"Development mode - Launching: {' '.join(cmd)}") - - process = subprocess.Popen(cmd) - RUNNING_GAMES[game_id] = process - logger.debug(f"Game {game_id} launched successfully with PID {process.pid}") - return True - except Exception as e: - logger.error(f"Error launching game {game_id}: {e}") - return False - - -def stop_game(game_id: str): - """Stop a running game.""" - if game_id in RUNNING_GAMES: - try: - process = RUNNING_GAMES[game_id] - process.terminate() - # Wait a bit for graceful termination - try: - process.wait(timeout=5) - except subprocess.TimeoutExpired: - process.kill() - except Exception as e: - print(f"Error stopping game {game_id}: {e}") - finally: - del RUNNING_GAMES[game_id] - - -def cleanup_finished_games(): - """Clean up finished game processes.""" - finished = [] - for game_id, process in RUNNING_GAMES.items(): - if process.poll() is not None: # Process has finished - finished.append(game_id) - - for game_id in finished: - del RUNNING_GAMES[game_id] - - @app.get("/") async def root(request: Request): """Serve the main HTML page.""" @@ -971,29 +309,22 @@ async def favicon(): @app.get("/api/server-status") async def server_status(): """Check if Lemonade Server is online.""" - online = await check_lemonade_server() + online = await lemonade_handle.check_lemonade_server_api() return JSONResponse({"online": online}) -@app.get("/api/models") -async def get_models(): - """Get available models from Lemonade Server.""" - models = await get_available_models() - return JSONResponse(models) - - @app.get("/api/games") async def get_games(): """Get all saved games.""" - cleanup_finished_games() - return JSONResponse(GAME_METADATA) + arcade_games.cleanup_finished_games() + return JSONResponse(arcade_games.game_metadata) @app.get("/api/installation-status") async def installation_status(): """Check lemonade-server installation status ONLY.""" logger.info("Installation status endpoint called") - version_info = await check_lemonade_server_version() + version_info = await lemonade_handle.check_lemonade_server_version() logger.info(f"Version check result: {version_info}") result = { @@ -1012,13 +343,13 @@ async def server_running_status(): logger.info("=== Server running status endpoint called ===") # Check if server is currently running - is_running = await check_lemonade_server_running() + is_running = await lemonade_handle.check_lemonade_server_running() logger.info(f"Initial running check result: {is_running}") # If server is not running, try to start it automatically if not is_running: logger.info("Server not running, attempting to start automatically...") - start_result = await start_lemonade_server() + start_result = await lemonade_handle.start_lemonade_server() logger.info(f"Auto-start result: {start_result}") if start_result["success"]: @@ -1029,7 +360,7 @@ async def server_running_status(): await asyncio.sleep(2) # Check again - is_running = await check_lemonade_server_running() + is_running = await lemonade_handle.check_lemonade_server_running() logger.info(f"Running check after auto-start: {is_running}") else: logger.warning( @@ -1047,7 +378,7 @@ async def server_running_status(): async def api_connection_status(): """Check API connection status ONLY.""" logger.info("=== API connection status endpoint called ===") - api_online = await check_lemonade_server() + api_online = await lemonade_handle.check_lemonade_server_api() logger.info(f"API online check result: {api_online}") result = { @@ -1061,7 +392,7 @@ async def api_connection_status(): async def model_installation_status(): """Check if required model is installed ONLY.""" logger.info("Model installation status endpoint called") - model_status = await check_required_model() + model_status = await lemonade_handle.check_model_installed(REQUIRED_MODEL) logger.info(f"Model check result: {model_status}") result = { @@ -1076,7 +407,7 @@ async def model_installation_status(): async def model_loading_status(): """Check if required model is loaded ONLY.""" logger.info("Model loading status endpoint called") - model_loaded_status = await check_model_loaded() + model_loaded_status = await lemonade_handle.check_model_loaded(REQUIRED_MODEL) logger.info(f"Model loaded check result: {model_loaded_status}") result = { @@ -1093,9 +424,11 @@ async def installation_environment(): """Check installation environment and available methods.""" logger.info("Installation environment endpoint called") - is_pyinstaller = is_pyinstaller_environment() + is_pyinstaller = lemonade_handle.is_pyinstaller_environment() sdk_available = ( - await check_lemonade_sdk_available() if not is_pyinstaller else False + await lemonade_handle.check_lemonade_sdk_available() + if not is_pyinstaller + else False ) result = { @@ -1114,9 +447,9 @@ async def refresh_environment_endpoint(): """Refresh environment variables after installation.""" logger.info("Refresh environment endpoint called") try: - refresh_environment() + lemonade_handle.refresh_environment() # Also reset server state so it will re-discover commands - reset_server_state() + lemonade_handle.reset_server_state() return JSONResponse({"success": True, "message": "Environment refreshed"}) except Exception as e: logger.error(f"Failed to refresh environment: {e}") @@ -1129,7 +462,7 @@ async def refresh_environment_endpoint(): async def install_server(): """Download and install lemonade-server.""" logger.info("Install server endpoint called") - result = await download_and_install_lemonade_server() + result = await lemonade_handle.download_and_install_lemonade_server() logger.info(f"Install result: {result}") return JSONResponse(result) @@ -1138,7 +471,7 @@ async def install_server(): async def start_server(): """Start lemonade-server if installed.""" logger.info("Start server endpoint called") - result = await start_lemonade_server() + result = await lemonade_handle.start_lemonade_server() logger.info(f"Start server result: {result}") return JSONResponse(result) @@ -1147,7 +480,7 @@ async def start_server(): async def install_model(): """Install the required model.""" logger.info("Install model endpoint called") - result = await install_required_model() + result = await lemonade_handle.install_model(REQUIRED_MODEL) logger.info(f"Install model result: {result}") return JSONResponse(result) @@ -1156,7 +489,7 @@ async def install_model(): async def load_model(): """Load the required model.""" logger.info("Load model endpoint called") - result = await load_required_model() + result = await lemonade_handle.load_model(REQUIRED_MODEL) logger.info(f"Load model result: {result}") return JSONResponse(result) @@ -1187,6 +520,7 @@ async def generate(): logger.debug("Sent 'Connecting to LLM...' status") # Prepare the system prompt for game generation + # pylint: disable=line-too-long system_prompt = """You are an expert Python game developer. Generate a complete, working Python game using pygame based on the user's description. Rules: @@ -1213,12 +547,12 @@ async def generate(): # Stream response from Lemonade Server logger.debug( - f"Starting request to {LEMONADE_SERVER_URL}/api/v1/chat/completions" + f"Starting request to {lemonade_handle.url}/api/v1/chat/completions" ) async with httpx.AsyncClient(timeout=600.0) as client: async with client.stream( "POST", - f"{LEMONADE_SERVER_URL}/api/v1/chat/completions", + f"{lemonade_handle.url}/api/v1/chat/completions", json={ "model": REQUIRED_MODEL, "messages": messages, @@ -1297,7 +631,7 @@ async def generate(): ) # Save the game - game_file = GAMES_DIR / f"{game_id}.py" + game_file = arcade_games.games_dir / f"{game_id}.py" logger.debug(f"Saving game to: {game_file}") with open(game_file, "w", encoding="utf-8") as f: f.write(python_code) @@ -1310,19 +644,19 @@ async def generate(): game_title = await generate_game_title(prompt) # Save metadata - GAME_METADATA[game_id] = { + arcade_games.game_metadata[game_id] = { "title": game_title, "created": time.time(), "prompt": prompt, } - save_metadata() + arcade_games.save_metadata() logger.debug(f"Saved metadata for game: {game_title}") yield f"data: {json.dumps({'type': 'status', 'message': 'Launching game...'})}\n\n" logger.debug("Starting game launch") # Launch the game - if launch_game(game_id): + if arcade_games.launch_game(game_id): logger.debug(f"Game {game_id} launched successfully") yield f"data: {json.dumps({'type': 'complete', 'game_id': game_id, 'message': 'Game created and launched!'})}\n\n" else: @@ -1347,15 +681,15 @@ async def generate(): @app.post("/api/launch-game/{game_id}") async def launch_game_endpoint(game_id: str): """Launch a specific game.""" - cleanup_finished_games() + arcade_games.cleanup_finished_games() - if RUNNING_GAMES: + if arcade_games.running_games: raise HTTPException(status_code=400, detail="Another game is already running") - if game_id not in GAME_METADATA: + if game_id not in arcade_games.game_metadata: raise HTTPException(status_code=404, detail="Game not found") - success = launch_game(game_id) + success = arcade_games.launch_game(game_id) if not success: raise HTTPException(status_code=500, detail="Failed to launch game") @@ -1365,33 +699,33 @@ async def launch_game_endpoint(game_id: str): @app.get("/api/game-status/{game_id}") async def game_status(game_id: str): """Check if a game is currently running.""" - cleanup_finished_games() - running = game_id in RUNNING_GAMES + arcade_games.cleanup_finished_games() + running = game_id in arcade_games.running_games return JSONResponse({"running": running}) @app.delete("/api/delete-game/{game_id}") async def delete_game_endpoint(game_id: str): """Delete a game.""" - if game_id not in GAME_METADATA: + if game_id not in arcade_games.game_metadata: raise HTTPException(status_code=404, detail="Game not found") # Prevent deletion of built-in games - if game_id in BUILTIN_GAMES: + if game_id in arcade_games.BUILTIN_GAMES: raise HTTPException(status_code=403, detail="Cannot delete built-in games") # Stop the game if it's running - if game_id in RUNNING_GAMES: - stop_game(game_id) + if game_id in arcade_games.running_games: + arcade_games.stop_game(game_id) # Delete the file - game_file = GAMES_DIR / f"{game_id}.py" + game_file = arcade_games.games_dir / f"{game_id}.py" if game_file.exists(): game_file.unlink() # Remove from metadata - del GAME_METADATA[game_id] - save_metadata() + del arcade_games.game_metadata[game_id] + arcade_games.save_metadata() return JSONResponse({"success": True}) @@ -1399,13 +733,13 @@ async def delete_game_endpoint(game_id: str): @app.get("/api/game-metadata/{game_id}") async def get_game_metadata(game_id: str): """Get metadata for a specific game.""" - if game_id not in GAME_METADATA: + if game_id not in arcade_games.game_metadata: raise HTTPException(status_code=404, detail="Game not found") - metadata = GAME_METADATA[game_id].copy() + metadata = arcade_games.game_metadata[game_id].copy() # For built-in games, hide sensitive information - if game_id in BUILTIN_GAMES: + if game_id in arcade_games.BUILTIN_GAMES: # Remove prompt and other sensitive data for built-in games metadata.pop("prompt", None) metadata["builtin"] = True @@ -1416,23 +750,20 @@ async def get_game_metadata(game_id: str): @app.post("/api/open-game-file/{game_id}") async def open_game_file(game_id: str): """Open the Python file for a game in the default editor.""" - if game_id not in GAME_METADATA: + if game_id not in arcade_games.game_metadata: raise HTTPException(status_code=404, detail="Game not found") # Prevent opening built-in game files - if game_id in BUILTIN_GAMES: + if game_id in arcade_games.BUILTIN_GAMES: raise HTTPException( status_code=403, detail="Cannot view source code of built-in games" ) - game_file = GAMES_DIR / f"{game_id}.py" + game_file = arcade_games.games_dir / f"{game_id}.py" if not game_file.exists(): raise HTTPException(status_code=404, detail="Game file not found") try: - import subprocess - import sys - # Try to open with the default program (works on Windows, macOS, Linux) if sys.platform.startswith("win"): subprocess.run(["start", str(game_file)], shell=True, check=True) @@ -1444,7 +775,9 @@ async def open_game_file(game_id: str): return JSONResponse({"success": True, "message": "File opened"}) except Exception as e: logger.error(f"Failed to open file {game_file}: {e}") - raise HTTPException(status_code=500, detail=f"Failed to open file: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Failed to open file: {str(e)}" + ) from e def run_game_file(game_file_path): @@ -1453,9 +786,11 @@ def run_game_file(game_file_path): print(f"Lemonade Arcade - Running game: {game_file_path}") # Import pygame here, right before we need it + # pylint: disable=global-statement global pygame if pygame is None: try: + # pylint: disable=redefined-outer-name import pygame print(f"Pygame {pygame.version.ver} loaded successfully") @@ -1468,6 +803,7 @@ def run_game_file(game_file_path): game_code = f.read() # Execute the game code - pygame should now be available + # pylint: disable=exec-used exec(game_code, {"__name__": "__main__", "__file__": game_file_path}) except Exception as e: @@ -1521,8 +857,8 @@ def run_server(): except KeyboardInterrupt: print("\nShutting down Lemonade Arcade...") # Clean up any running games - for game_id in list(RUNNING_GAMES.keys()): - stop_game(game_id) + for game_id in list(arcade_games.running_games.keys()): + arcade_games.stop_game(game_id) if __name__ == "__main__": diff --git a/test/lemonade_client_integration.py b/test/lemonade_client_integration.py new file mode 100644 index 0000000..f31716f --- /dev/null +++ b/test/lemonade_client_integration.py @@ -0,0 +1,341 @@ +import unittest +import asyncio +import sys +import os +import time +import subprocess +import platform +import tempfile +import signal +from pathlib import Path + +# Add the lemonade_arcade package to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lemonade_arcade")) +from lemonade_arcade.lemonade_client import LemonadeClient, LEMONADE_MINIMUM_VERSION + + +class TestLemonadeClientIntegration(unittest.TestCase): + """Integration tests for LemonadeClient that actually interact with a real Lemonade server. + + These tests will: + 1. Install lemonade-sdk if not available + 2. Install lemonade-server if needed + 3. Start the server and test real API interactions + 4. Test model installation and loading + 5. Clean up server processes when done + """ + + @classmethod + def setUpClass(cls): + """Set up test fixtures for the entire test class.""" + cls.client = LemonadeClient() + cls.server_started = False + cls.test_model = "Qwen3-0.6B-GGUF" # Small model for testing + cls.setup_timeout = 300 # 5 minutes for setup + cls.test_timeout = 60 # 1 minute for individual tests + + @classmethod + def tearDownClass(cls): + """Clean up after all tests.""" + if cls.server_started and cls.client.server_process: + try: + cls.client.server_process.terminate() + cls.client.server_process.wait(timeout=10) + except Exception as e: + print(f"Warning: Failed to cleanly stop server: {e}") + if ( + cls.client.server_process + and cls.client.server_process.poll() is None + ): + try: + cls.client.server_process.kill() + except Exception: + pass + + async def async_setUp(self): + """Async setup for each test.""" + # Reset client state + self.client.reset_server_state() + + def setUp(self): + """Set up for each test.""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.loop.run_until_complete(self.async_setUp()) + + def tearDown(self): + """Clean up after each test.""" + if self.loop: + # Clean up any pending tasks + pending = asyncio.all_tasks(self.loop) + for task in pending: + task.cancel() + if pending: + self.loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) + self.loop.close() + + def run_async(self, coro, timeout=None): + """Helper to run async functions in tests.""" + if timeout is None: + timeout = self.test_timeout + return asyncio.wait_for(coro, timeout=timeout) + + def test_01_install_and_check_lemonade_sdk(self): + """Test installing lemonade-sdk if needed and checking if it's available.""" + # First check if it's already available + result = self.loop.run_until_complete( + self.run_async(self.client.check_lemonade_sdk_available()) + ) + + if not result: + print("Installing lemonade-sdk...") + # Install lemonade-sdk using pip + install_result = self.loop.run_until_complete( + self.run_async( + self.client.install_lemonade_sdk_package(), + timeout=self.setup_timeout, + ) + ) + + self.assertTrue( + install_result["success"], + f"lemonade-sdk installation should succeed: {install_result.get('message', '')}", + ) + + # Reset server state and refresh environment after installation + print("Refreshing environment after installation...") + self.client.reset_server_state() + self.client.refresh_environment() + + # Wait a moment for environment changes to take effect + import time + + time.sleep(2) + + # Verify it's now available + result_after = self.loop.run_until_complete( + self.run_async(self.client.check_lemonade_sdk_available()) + ) + self.assertTrue( + result_after, "lemonade-sdk should be available after installation" + ) + else: + print("lemonade-sdk already available") + + # The final check - either it was already available or we installed it successfully + final_result = result or result_after if not result else True + self.assertTrue(final_result, "lemonade-sdk should be available") + + def test_02_check_lemonade_server_version(self): + """Test checking lemonade server version.""" + result = self.loop.run_until_complete( + self.run_async(self.client.check_lemonade_server_version()) + ) + + self.assertIsInstance(result, dict) + self.assertIn("installed", result) + self.assertIn("version", result) + self.assertIn("compatible", result) + self.assertIn("required_version", result) + + self.assertTrue( + result["compatible"], + f"Server version {result['version']} should be compatible with minimum {result['required_version']}", + ) + + def test_03_start_lemonade_server(self): + """Test starting lemonade server.""" + # Check if server is already running + running = self.loop.run_until_complete( + self.run_async(self.client.check_lemonade_server_running()) + ) + + if not running: + print("Starting lemonade server...") + result = self.loop.run_until_complete( + self.run_async( + self.client.start_lemonade_server(), timeout=self.setup_timeout + ) + ) + + self.assertTrue( + result["success"], + f"Server start should succeed: {result.get('message', '')}", + ) + + if result["success"]: + self.__class__.server_started = True + + # Wait for server to be fully up + print("Waiting for server to be ready...") + max_wait = 120 # 2 minutes + wait_start = time.time() + + while time.time() - wait_start < max_wait: + try: + api_online = self.loop.run_until_complete( + self.run_async( + self.client.check_lemonade_server_api(), timeout=10 + ) + ) + if api_online: + print("Server API is ready!") + break + except Exception as e: + print(f"Waiting for server... ({e})") + + time.sleep(5) + else: + self.fail("Server did not become ready within timeout") + + def test_04_check_lemonade_server_api(self): + """Test checking if lemonade server API is responding.""" + result = self.loop.run_until_complete( + self.run_async(self.client.check_lemonade_server_api()) + ) + self.assertTrue(result, "Server API should be responding") + + def test_05_get_available_models(self): + """Test getting available models from server.""" + models = self.loop.run_until_complete( + self.run_async(self.client.get_available_models()) + ) + self.assertIsInstance(models, list, "Should return a list of models") + # Note: List might be empty if no models are installed yet + + def test_06_install_model(self): + """Test installing a model.""" + # First check if model is already installed + check_result = self.loop.run_until_complete( + self.run_async(self.client.check_model_installed(self.test_model)) + ) + + if not check_result["installed"]: + print(f"Installing test model: {self.test_model}") + result = self.loop.run_until_complete( + self.run_async( + self.client.install_model(self.test_model), + timeout=self.setup_timeout, + ) + ) + + self.assertTrue( + result["success"], + f"Model installation should succeed: {result.get('message', '')}", + ) + + # Verify model is now installed + check_result_after = self.loop.run_until_complete( + self.run_async(self.client.check_model_installed(self.test_model)) + ) + self.assertTrue( + check_result_after["installed"], + "Model should be installed after installation", + ) + else: + print(f"Test model {self.test_model} already installed") + + def test_07_check_model_installed(self): + """Test checking if a model is installed.""" + result = self.loop.run_until_complete( + self.run_async(self.client.check_model_installed(self.test_model)) + ) + + self.assertIsInstance(result, dict) + self.assertIn("installed", result) + self.assertIn("model_name", result) + self.assertEqual(result["model_name"], self.test_model) + + def test_08_load_model(self): + """Test loading a model.""" + # Ensure model is installed first + check_result = self.loop.run_until_complete( + self.run_async(self.client.check_model_installed(self.test_model)) + ) + + if check_result["installed"]: + print(f"Loading test model: {self.test_model}") + result = self.loop.run_until_complete( + self.run_async( + self.client.load_model(self.test_model), timeout=self.setup_timeout + ) + ) + + self.assertTrue( + result["success"], + f"Model loading should succeed: {result.get('message', '')}", + ) + else: + self.skipTest(f"Test model {self.test_model} not installed") + + def test_09_check_model_loaded(self): + """Test checking if a model is loaded.""" + result = self.loop.run_until_complete( + self.run_async(self.client.check_model_loaded(self.test_model)) + ) + + self.assertIsInstance(result, dict) + self.assertIn("loaded", result) + self.assertIn("model_name", result) + self.assertIn("current_model", result) + self.assertEqual(result["model_name"], self.test_model) + + +def run_async_test(coro): + """Helper function to run async tests.""" + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + return loop.run_until_complete(coro) + finally: + # Clean up any pending tasks + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + + +# Convert async test methods to sync for unittest +async_test_methods = [] +for attr_name in dir(TestLemonadeClientIntegration): + attr = getattr(TestLemonadeClientIntegration, attr_name) + if ( + callable(attr) + and attr_name.startswith("test_") + and asyncio.iscoroutinefunction(attr) + ): + async_test_methods.append((attr_name, attr)) + +# Apply the conversion with proper closure handling +for attr_name, original_method in async_test_methods: + + def make_sync_test(method): + def sync_test(self): + return run_async_test(method(self)) + + return sync_test + + setattr(TestLemonadeClientIntegration, attr_name, make_sync_test(original_method)) + + +if __name__ == "__main__": + # Set up logging to see what's happening + import logging + + logging.basicConfig( + level=logging.DEBUG + ) # Enable DEBUG level to see command details + + # Suppress noisy httpcore debug messages + logging.getLogger("httpcore").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + + # Run with verbose output + unittest.main(verbosity=2) diff --git a/test/lemonade_client_unit.py b/test/lemonade_client_unit.py new file mode 100644 index 0000000..b2bb4ab --- /dev/null +++ b/test/lemonade_client_unit.py @@ -0,0 +1,952 @@ +import unittest +from unittest.mock import patch, MagicMock, AsyncMock, mock_open +import asyncio +import sys +import os +import subprocess +import tempfile +import httpx +import platform + +# Add the lemonade_arcade package to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lemonade_arcade")) +from lemonade_arcade.lemonade_client import LemonadeClient, LEMONADE_MINIMUM_VERSION + + +class TestLemonadeClient(unittest.TestCase): + def setUp(self): + """Set up test fixtures before each test method.""" + self.client = LemonadeClient() + + def test_init(self): + """Test LemonadeClient initialization.""" + client = LemonadeClient() + self.assertIsNone(client.server_command) + self.assertIsNone(client.server_process) + self.assertEqual(client.url, "http://localhost:8000") + + def test_is_pyinstaller_environment_false(self): + """Test is_pyinstaller_environment returns False when not in PyInstaller.""" + with patch.object(sys, "frozen", create=True, new=False): + result = self.client.is_pyinstaller_environment() + self.assertFalse(result) + + def test_is_pyinstaller_environment_true(self): + """Test is_pyinstaller_environment returns True when in PyInstaller.""" + with patch.object(sys, "frozen", create=True, new=True): + result = self.client.is_pyinstaller_environment() + self.assertTrue(result) + + def test_is_pyinstaller_environment_no_attribute(self): + """Test is_pyinstaller_environment returns False when frozen attribute doesn't exist.""" + if hasattr(sys, "frozen"): + delattr(sys, "frozen") + result = self.client.is_pyinstaller_environment() + self.assertFalse(result) + + @unittest.skipIf(platform.system() != "Windows", "Windows-specific test") + @patch("os.path.exists") + @patch("os.environ.get") + def test_find_lemonade_server_paths_windows(self, mock_env_get, mock_exists): + """Test finding lemonade server paths on Windows.""" + # Mock PATH with lemonade_server entries + mock_env_get.return_value = "C:\\Windows\\System32;C:\\lemonade_server\\bin;D:\\other\\bin\\lemonade_server" + mock_exists.side_effect = lambda path: "lemonade_server" in path + + with patch("sys.platform", "win32"): + paths = self.client.find_lemonade_server_paths() + expected_paths = [ + "C:\\lemonade_server\\bin", + "D:\\other\\bin\\lemonade_server", + ] + self.assertEqual(paths, expected_paths) + + @patch("os.path.exists") + @patch("os.environ.get") + def test_find_lemonade_server_paths_linux(self, mock_env_get, mock_exists): + """Test finding lemonade server paths on Linux.""" + mock_env_get.return_value = ( + "/usr/bin:/home/user/lemonade_server/bin:/usr/local/bin" + ) + mock_exists.side_effect = lambda path: "lemonade_server" in path + + with patch("sys.platform", "linux"): + paths = self.client.find_lemonade_server_paths() + expected_paths = ["/home/user/lemonade_server/bin"] + self.assertEqual(paths, expected_paths) + + @patch("os.path.exists") + @patch("os.environ.get") + def test_find_lemonade_server_paths_empty(self, mock_env_get, mock_exists): + """Test finding lemonade server paths returns empty list when none found.""" + mock_env_get.return_value = "/usr/bin:/usr/local/bin" + mock_exists.return_value = False + + paths = self.client.find_lemonade_server_paths() + self.assertEqual(paths, []) + + def test_reset_server_state(self): + """Test resetting server state.""" + # Set up initial state + self.client.server_command = ["test", "command"] + mock_process = MagicMock() + mock_process.poll.return_value = None # Process is running + self.client.server_process = mock_process + + self.client.reset_server_state() + + self.assertIsNone(self.client.server_command) + self.assertIsNone(self.client.server_process) + mock_process.terminate.assert_called_once() + + def test_reset_server_state_process_already_terminated(self): + """Test resetting server state when process is already terminated.""" + self.client.server_command = ["test", "command"] + mock_process = MagicMock() + mock_process.poll.return_value = 1 # Process has terminated + self.client.server_process = mock_process + + self.client.reset_server_state() + + self.assertIsNone(self.client.server_command) + self.assertIsNone(self.client.server_process) + mock_process.terminate.assert_not_called() + + def test_reset_server_state_terminate_exception(self): + """Test resetting server state when terminate raises exception.""" + self.client.server_command = ["test", "command"] + mock_process = MagicMock() + mock_process.poll.return_value = None + mock_process.terminate.side_effect = Exception("Test exception") + self.client.server_process = mock_process + + # Should not raise exception + self.client.reset_server_state() + + self.assertIsNone(self.client.server_command) + self.assertIsNone(self.client.server_process) + + @unittest.skipIf(platform.system() != "Windows", "Windows-specific test") + @patch("winreg.OpenKey") + @patch("winreg.QueryValueEx") + @patch("os.environ") + def test_refresh_environment_windows_success( + self, mock_environ, mock_query, mock_open_key + ): + """Test refreshing environment variables on Windows successfully.""" + # Mock registry access + mock_context_manager = MagicMock() + mock_open_key.return_value.__enter__.return_value = mock_context_manager + mock_query.side_effect = [ + ("C:\\Windows\\System32;C:\\Program Files", 1), # System PATH + ("C:\\Users\\User\\bin", 1), # User PATH + ] + + # Mock Python Scripts discovery to return empty list + with patch.object( + self.client, "_discover_python_scripts_paths", return_value=[] + ): + self.client.refresh_environment() + + # Check that PATH was updated + expected_path = "C:\\Users\\User\\bin;C:\\Windows\\System32;C:\\Program Files" + mock_environ.__setitem__.assert_called_with("PATH", expected_path) + + @unittest.skipIf(platform.system() != "Windows", "Windows-specific test") + @patch("winreg.OpenKey") + @patch("winreg.QueryValueEx") + def test_refresh_environment_windows_no_user_path(self, mock_query, mock_open_key): + """Test refreshing environment variables when user PATH doesn't exist.""" + mock_context_manager = MagicMock() + mock_open_key.return_value.__enter__.return_value = mock_context_manager + mock_query.side_effect = [ + ("C:\\Windows\\System32", 1), # System PATH + FileNotFoundError(), # User PATH not found + ] + + with patch("os.environ") as mock_environ: + # Mock Python Scripts discovery to return empty list + with patch.object( + self.client, "_discover_python_scripts_paths", return_value=[] + ): + self.client.refresh_environment() + # Should still set PATH to system PATH only + mock_environ.__setitem__.assert_called_with("PATH", "C:\\Windows\\System32") + + @unittest.skipIf(platform.system() != "Windows", "Windows-specific test") + @patch("winreg.OpenKey") + def test_refresh_environment_windows_exception(self, mock_open_key): + """Test refreshing environment variables when registry access fails.""" + mock_open_key.side_effect = Exception("Registry error") + + # Should not raise exception + self.client.refresh_environment() + + async def test_execute_lemonade_server_command_with_stored_command(self): + """Test executing command when server_command is already stored.""" + self.client.server_command = ["lemonade-server"] + + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "test output" + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = await self.client.execute_lemonade_server_command(["--version"]) + + self.assertEqual(result, mock_result) + mock_run.assert_called_once() + # Check that the command includes the stored command + args + args, kwargs = mock_run.call_args + self.assertIn("lemonade-server --version", args[0]) + + @unittest.skipIf(platform.system() != "Windows", "Windows-specific test") + async def test_execute_lemonade_server_command_windows_success(self): + """Test executing command on Windows with successful result.""" + with patch("sys.platform", "win32"), patch.object( + self.client, "is_pyinstaller_environment", return_value=False + ), patch.object( + self.client, "find_lemonade_server_paths", return_value=[] + ), patch( + "subprocess.run" + ) as mock_run: + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "version 8.1.5" + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = await self.client.execute_lemonade_server_command(["--version"]) + + self.assertEqual(result, mock_result) + self.assertEqual(self.client.server_command, ["lemonade-server-dev"]) + + async def test_execute_lemonade_server_command_linux_success(self): + """Test executing command on Linux with successful result.""" + with patch("sys.platform", "linux"), patch("subprocess.run") as mock_run: + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "version 8.1.5" + mock_result.stderr = "" + mock_run.return_value = mock_result + + result = await self.client.execute_lemonade_server_command(["--version"]) + + self.assertEqual(result, mock_result) + self.assertEqual(self.client.server_command, ["lemonade-server-dev"]) + + async def test_execute_lemonade_server_command_popen_mode(self): + """Test executing command with use_popen=True.""" + with patch("subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_popen.return_value = mock_process + + result = await self.client.execute_lemonade_server_command( + ["serve"], use_popen=True + ) + + self.assertEqual(result, mock_process) + mock_popen.assert_called_once() + + async def test_execute_lemonade_server_command_all_fail(self): + """Test executing command when all commands fail.""" + with patch("sys.platform", "linux"), patch("subprocess.run") as mock_run: + + mock_run.side_effect = FileNotFoundError("Command not found") + + result = await self.client.execute_lemonade_server_command(["--version"]) + + self.assertIsNone(result) + + async def test_execute_lemonade_server_command_timeout(self): + """Test executing command with timeout.""" + with patch("sys.platform", "linux"), patch("subprocess.run") as mock_run: + + mock_run.side_effect = subprocess.TimeoutExpired("cmd", 10) + + result = await self.client.execute_lemonade_server_command(["--version"]) + + self.assertIsNone(result) + + async def test_check_lemonade_sdk_available_true(self): + """Test checking lemonade-sdk availability when available.""" + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "available" + mock_run.return_value = mock_result + + result = await self.client.check_lemonade_sdk_available() + + self.assertTrue(result) + + async def test_check_lemonade_sdk_available_false(self): + """Test checking lemonade-sdk availability when not available.""" + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stdout = "error" + mock_run.return_value = mock_result + + result = await self.client.check_lemonade_sdk_available() + + self.assertFalse(result) + + async def test_check_lemonade_sdk_available_exception(self): + """Test checking lemonade-sdk availability when exception occurs.""" + with patch("subprocess.run", side_effect=Exception("Test error")): + result = await self.client.check_lemonade_sdk_available() + self.assertFalse(result) + + async def test_check_lemonade_server_version_success(self): + """Test checking lemonade server version successfully.""" + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_result = MagicMock() + mock_result.stdout = "lemonade-server 8.1.5" + mock_exec.return_value = mock_result + + result = await self.client.check_lemonade_server_version() + + expected = { + "installed": True, + "version": "8.1.5", + "compatible": True, + "required_version": LEMONADE_MINIMUM_VERSION, + } + self.assertEqual(result, expected) + + async def test_check_lemonade_server_version_incompatible(self): + """Test checking lemonade server version with incompatible version.""" + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_result = MagicMock() + mock_result.stdout = "lemonade-server 8.1.0" # Lower than minimum + mock_exec.return_value = mock_result + + result = await self.client.check_lemonade_server_version() + + expected = { + "installed": True, + "version": "8.1.0", + "compatible": False, + "required_version": LEMONADE_MINIMUM_VERSION, + } + self.assertEqual(result, expected) + + async def test_check_lemonade_server_version_no_version_match(self): + """Test checking lemonade server version when no version is found in output.""" + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_result = MagicMock() + mock_result.stdout = "invalid version output" + mock_exec.return_value = mock_result + + result = await self.client.check_lemonade_server_version() + + expected = { + "installed": True, + "version": "unknown", + "compatible": False, + "required_version": LEMONADE_MINIMUM_VERSION, + } + self.assertEqual(result, expected) + + async def test_check_lemonade_server_version_command_failed(self): + """Test checking lemonade server version when command fails.""" + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_exec.return_value = None + + result = await self.client.check_lemonade_server_version() + + expected = { + "installed": False, + "version": None, + "compatible": False, + "required_version": LEMONADE_MINIMUM_VERSION, + } + self.assertEqual(result, expected) + + async def test_check_lemonade_server_running_true(self): + """Test checking if lemonade server is running - returns True.""" + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_result = MagicMock() + mock_result.stdout = "Server is running on port 8000" + mock_exec.return_value = mock_result + + result = await self.client.check_lemonade_server_running() + + self.assertTrue(result) + + async def test_check_lemonade_server_running_false(self): + """Test checking if lemonade server is running - returns False.""" + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_result = MagicMock() + mock_result.stdout = "Server is not running" + mock_exec.return_value = mock_result + + result = await self.client.check_lemonade_server_running() + + self.assertFalse(result) + + async def test_check_lemonade_server_running_command_failed(self): + """Test checking if lemonade server is running when command fails.""" + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_exec.return_value = None + + result = await self.client.check_lemonade_server_running() + + self.assertFalse(result) + + async def test_start_lemonade_server_already_running(self): + """Test starting lemonade server when it's already running.""" + mock_process = MagicMock() + mock_process.poll.return_value = None # Process is running + self.client.server_process = mock_process + + result = await self.client.start_lemonade_server() + + expected = {"success": True, "message": "Server is already running"} + self.assertEqual(result, expected) + + @patch("tempfile.NamedTemporaryFile") + @patch("time.sleep") + @patch("os.unlink") + async def test_start_lemonade_server_success( + self, mock_unlink, mock_sleep, mock_temp_file + ): + """Test starting lemonade server successfully.""" + # Mock temp files + mock_stdout_file = MagicMock() + mock_stderr_file = MagicMock() + mock_stdout_file.name = "stdout.log" + mock_stderr_file.name = "stderr.log" + mock_temp_file.side_effect = [mock_stdout_file, mock_stderr_file] + + # Mock process + mock_process = MagicMock() + mock_process.poll.return_value = None # Process is running + mock_process.pid = 12345 + + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_exec.return_value = mock_process + + result = await self.client.start_lemonade_server() + + expected = {"success": True, "message": "Server start command issued"} + self.assertEqual(result, expected) + self.assertEqual(self.client.server_process, mock_process) + + @patch("tempfile.NamedTemporaryFile") + async def test_start_lemonade_server_command_failed(self, mock_temp_file): + """Test starting lemonade server when command fails.""" + mock_stdout_file = MagicMock() + mock_stderr_file = MagicMock() + mock_temp_file.side_effect = [mock_stdout_file, mock_stderr_file] + + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_exec.return_value = None + + result = await self.client.start_lemonade_server() + + expected = { + "success": False, + "message": "Failed to start server: all commands failed", + } + self.assertEqual(result, expected) + + @patch("tempfile.NamedTemporaryFile") + @patch("time.sleep") + @patch("builtins.open", new_callable=mock_open, read_data="Error message") + @patch("os.unlink") + async def test_start_lemonade_server_process_dies( + self, mock_unlink, mock_file, mock_sleep, mock_temp_file + ): + """Test starting lemonade server when process dies immediately.""" + mock_stdout_file = MagicMock() + mock_stderr_file = MagicMock() + mock_stdout_file.name = "stdout.log" + mock_stderr_file.name = "stderr.log" + mock_temp_file.side_effect = [mock_stdout_file, mock_stderr_file] + + mock_process = MagicMock() + mock_process.poll.return_value = 1 # Process has died + mock_process.returncode = 1 + + with patch.object(self.client, "execute_lemonade_server_command") as mock_exec: + mock_exec.return_value = mock_process + + result = await self.client.start_lemonade_server() + + expected = {"success": False, "message": "Server process died immediately"} + self.assertEqual(result, expected) + + async def test_install_lemonade_sdk_package_success(self): + """Test installing lemonade-sdk package successfully.""" + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + result = await self.client.install_lemonade_sdk_package() + + expected = { + "success": True, + "message": "lemonade-sdk package installed successfully. You can now use 'lemonade-server-dev' command.", + } + self.assertEqual(result, expected) + + async def test_install_lemonade_sdk_package_failure(self): + """Test installing lemonade-sdk package with failure.""" + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stderr = "Installation failed" + mock_run.return_value = mock_result + + result = await self.client.install_lemonade_sdk_package() + + self.assertFalse(result["success"]) + self.assertIn("pip install failed", result["message"]) + + async def test_install_lemonade_sdk_package_exception(self): + """Test installing lemonade-sdk package with exception.""" + with patch("subprocess.run", side_effect=Exception("Test error")): + result = await self.client.install_lemonade_sdk_package() + + self.assertFalse(result["success"]) + self.assertIn("Failed to install", result["message"]) + + @patch.object(LemonadeClient, "reset_server_state") + @patch.object(LemonadeClient, "is_pyinstaller_environment") + @patch.object(LemonadeClient, "install_lemonade_sdk_package") + async def test_download_and_install_lemonade_server_pip_success( + self, mock_install, mock_pyinstaller, mock_reset + ): + """Test downloading and installing lemonade server via pip successfully.""" + mock_pyinstaller.return_value = False + mock_install.return_value = {"success": True, "message": "Success"} + + result = await self.client.download_and_install_lemonade_server() + + self.assertTrue(result["success"]) + mock_reset.assert_called_once() + mock_install.assert_called_once() + + @patch.object(LemonadeClient, "reset_server_state") + @patch.object(LemonadeClient, "is_pyinstaller_environment") + @patch.object(LemonadeClient, "install_lemonade_sdk_package") + async def test_download_and_install_lemonade_server_pip_failure( + self, mock_install, mock_pyinstaller, mock_reset + ): + """Test downloading and installing lemonade server when pip fails.""" + mock_pyinstaller.return_value = False + mock_install.return_value = {"success": False, "message": "Pip failed"} + + result = await self.client.download_and_install_lemonade_server() + + self.assertFalse(result["success"]) + self.assertIn("github.com", result["message"]) + + @patch("tempfile.mkdtemp") + @patch("subprocess.Popen") + @patch("httpx.AsyncClient") + @patch.object(LemonadeClient, "reset_server_state") + @patch.object(LemonadeClient, "is_pyinstaller_environment") + async def test_download_and_install_lemonade_server_installer( + self, mock_pyinstaller, mock_reset, mock_httpx, mock_popen, mock_mkdtemp + ): + """Test downloading and installing lemonade server via installer.""" + mock_pyinstaller.return_value = True + mock_mkdtemp.return_value = "/tmp/test" + + # Mock HTTP client and response properly + mock_client = MagicMock() + mock_response = MagicMock() + mock_response.status_code = 200 + + # Mock aiter_bytes as an async generator + async def mock_aiter_bytes(chunk_size=8192): + yield b"test data" + + mock_response.aiter_bytes = mock_aiter_bytes + + # Mock the stream context manager + mock_stream_context = MagicMock() + mock_stream_context.__aenter__ = AsyncMock(return_value=mock_response) + mock_stream_context.__aexit__ = AsyncMock(return_value=None) + mock_client.stream.return_value = mock_stream_context + + # Mock the client context manager + mock_client_context = MagicMock() + mock_client_context.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_context.__aexit__ = AsyncMock(return_value=None) + mock_httpx.return_value = mock_client_context + + mock_process = MagicMock() + mock_popen.return_value = mock_process + + with patch("builtins.open", mock_open()): + result = await self.client.download_and_install_lemonade_server() + + self.assertTrue(result["success"]) + self.assertTrue(result["interactive"]) + mock_reset.assert_called_once() + + async def test_check_lemonade_server_api_success(self): + """Test checking lemonade server API successfully.""" + mock_response = MagicMock() + mock_response.status_code = 200 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.return_value = ( + mock_response + ) + + result = await self.client.check_lemonade_server_api() + + self.assertTrue(result) + + async def test_check_lemonade_server_api_health_endpoint(self): + """Test checking lemonade server API using health endpoint.""" + mock_models_response = MagicMock() + mock_models_response.status_code = 404 + mock_health_response = MagicMock() + mock_health_response.status_code = 200 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.side_effect = [ + mock_models_response, + mock_health_response, + ] + + result = await self.client.check_lemonade_server_api() + + self.assertTrue(result) + + async def test_check_lemonade_server_api_timeout(self): + """Test checking lemonade server API with timeout.""" + with patch("httpx.AsyncClient") as mock_client, patch("asyncio.sleep"): + mock_client.return_value.__aenter__.return_value.get.side_effect = ( + httpx.TimeoutException("Timeout") + ) + + result = await self.client.check_lemonade_server_api() + + self.assertFalse(result) + + async def test_check_lemonade_server_api_connection_error(self): + """Test checking lemonade server API with connection error.""" + with patch("httpx.AsyncClient") as mock_client, patch("asyncio.sleep"): + mock_client.return_value.__aenter__.return_value.get.side_effect = ( + httpx.ConnectError("Connection failed") + ) + + result = await self.client.check_lemonade_server_api() + + self.assertFalse(result) + + async def test_get_available_models_success(self): + """Test getting available models successfully.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": [{"id": "model1"}, {"id": "model2"}]} + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.return_value = ( + mock_response + ) + + result = await self.client.get_available_models() + + self.assertEqual(result, ["model1", "model2"]) + + async def test_get_available_models_failure(self): + """Test getting available models with API failure.""" + mock_response = MagicMock() + mock_response.status_code = 500 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.return_value = ( + mock_response + ) + + result = await self.client.get_available_models() + + self.assertEqual(result, []) + + async def test_get_available_models_exception(self): + """Test getting available models with exception.""" + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.side_effect = ( + Exception("Test error") + ) + + result = await self.client.get_available_models() + + self.assertEqual(result, []) + + async def test_check_model_installed_true(self): + """Test checking if model is installed - returns True.""" + with patch.object(self.client, "get_available_models") as mock_get_models: + mock_get_models.return_value = ["model1", "model2", "test_model"] + + result = await self.client.check_model_installed("test_model") + + expected = {"installed": True, "model_name": "test_model"} + self.assertEqual(result, expected) + + async def test_check_model_installed_false(self): + """Test checking if model is installed - returns False.""" + with patch.object(self.client, "get_available_models") as mock_get_models: + mock_get_models.return_value = ["model1", "model2"] + + result = await self.client.check_model_installed("test_model") + + expected = {"installed": False, "model_name": "test_model"} + self.assertEqual(result, expected) + + async def test_check_model_installed_exception(self): + """Test checking if model is installed with exception.""" + with patch.object(self.client, "get_available_models") as mock_get_models: + mock_get_models.side_effect = Exception("Test error") + + result = await self.client.check_model_installed("test_model") + + expected = {"installed": False, "model_name": "test_model"} + self.assertEqual(result, expected) + + async def test_check_model_loaded_true(self): + """Test checking if model is loaded - returns True.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"model_loaded": "test_model"} + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.return_value = ( + mock_response + ) + + result = await self.client.check_model_loaded("test_model") + + expected = { + "loaded": True, + "model_name": "test_model", + "current_model": "test_model", + } + self.assertEqual(result, expected) + + async def test_check_model_loaded_false(self): + """Test checking if model is loaded - returns False.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"model_loaded": "other_model"} + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.return_value = ( + mock_response + ) + + result = await self.client.check_model_loaded("test_model") + + expected = { + "loaded": False, + "model_name": "test_model", + "current_model": "other_model", + } + self.assertEqual(result, expected) + + async def test_check_model_loaded_api_failure(self): + """Test checking if model is loaded with API failure.""" + mock_response = MagicMock() + mock_response.status_code = 500 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.return_value = ( + mock_response + ) + + result = await self.client.check_model_loaded("test_model") + + expected = { + "loaded": False, + "model_name": "test_model", + "current_model": None, + } + self.assertEqual(result, expected) + + async def test_check_model_loaded_exception(self): + """Test checking if model is loaded with exception.""" + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.get.side_effect = ( + Exception("Test error") + ) + + result = await self.client.check_model_loaded("test_model") + + expected = { + "loaded": False, + "model_name": "test_model", + "current_model": None, + } + self.assertEqual(result, expected) + + async def test_install_model_success(self): + """Test installing model successfully.""" + mock_response = MagicMock() + mock_response.status_code = 200 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.return_value = ( + mock_response + ) + + result = await self.client.install_model("test_model") + + expected = { + "success": True, + "message": "Model test_model installed successfully", + } + self.assertEqual(result, expected) + + async def test_install_model_failure(self): + """Test installing model with API failure.""" + mock_response = MagicMock() + mock_response.status_code = 500 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.return_value = ( + mock_response + ) + + result = await self.client.install_model("test_model") + + self.assertFalse(result["success"]) + self.assertIn("Failed to install model", result["message"]) + + async def test_install_model_timeout(self): + """Test installing model with timeout.""" + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.side_effect = ( + httpx.TimeoutException("Timeout") + ) + + result = await self.client.install_model("test_model") + + self.assertFalse(result["success"]) + self.assertIn("timed out", result["message"]) + + async def test_install_model_exception(self): + """Test installing model with exception.""" + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.side_effect = ( + Exception("Test error") + ) + + result = await self.client.install_model("test_model") + + self.assertFalse(result["success"]) + self.assertIn("Error installing model", result["message"]) + + async def test_load_model_success(self): + """Test loading model successfully.""" + mock_response = MagicMock() + mock_response.status_code = 200 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.return_value = ( + mock_response + ) + + result = await self.client.load_model("test_model") + + expected = { + "success": True, + "message": "Model test_model loaded successfully", + } + self.assertEqual(result, expected) + + async def test_load_model_failure(self): + """Test loading model with API failure.""" + mock_response = MagicMock() + mock_response.status_code = 500 + + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.return_value = ( + mock_response + ) + + result = await self.client.load_model("test_model") + + self.assertFalse(result["success"]) + self.assertIn("Failed to load model", result["message"]) + + async def test_load_model_timeout(self): + """Test loading model with timeout.""" + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.side_effect = ( + httpx.TimeoutException("Timeout") + ) + + result = await self.client.load_model("test_model") + + self.assertFalse(result["success"]) + self.assertIn("timed out", result["message"]) + + async def test_load_model_exception(self): + """Test loading model with exception.""" + with patch("httpx.AsyncClient") as mock_client: + mock_client.return_value.__aenter__.return_value.post.side_effect = ( + Exception("Test error") + ) + + result = await self.client.load_model("test_model") + + self.assertFalse(result["success"]) + self.assertIn("Error loading model", result["message"]) + + +def run_async_test(coro): + """Helper function to run async tests.""" + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + return loop.run_until_complete(coro) + finally: + # Clean up any pending tasks + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + + +# Convert async test methods to sync for unittest +async_test_methods = [] +for attr_name in dir(TestLemonadeClient): + attr = getattr(TestLemonadeClient, attr_name) + if ( + callable(attr) + and attr_name.startswith("test_") + and asyncio.iscoroutinefunction(attr) + ): + async_test_methods.append((attr_name, attr)) + +# Apply the conversion with proper closure handling +for attr_name, original_method in async_test_methods: + + def make_sync_test(method): + def sync_test(self): + return run_async_test(method(self)) + + return sync_test + + setattr(TestLemonadeClient, attr_name, make_sync_test(original_method)) + + +if __name__ == "__main__": + unittest.main()