Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pip install MergePythonClient

A full reference for this library is available [here](https://github.com/merge-api/merge-python-client/blob/HEAD/./reference.md).

### Merge Assurance

Merge Assurance is a feature that proactively monitors for expiring API credentials and automatically refreshes them. You can find more information about it in the [Remediation: Merge Assurance Agent](https://github.com/merge-api/merge-python-client/blob/HEAD/./reference.md#remediation-merge-assurance-agent) section of the reference guide.

## Usage

Instantiate and use the client with the following:
Expand Down
99 changes: 99 additions & 0 deletions e2e_test/mock_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# A stateful mock server that simulates the Merge API for E2E tests.

from __future__ import annotations

import logging
import sys
import typing
from datetime import datetime, timedelta

from flask import Flask, jsonify, request

# --- Server Configuration ---
HOST = "127.0.0.1"
PORT = 5002

# --- In-Memory "Database" ---
# This global state allows the server to remember changes, like a token refresh.
mock_db: typing.Dict[str, typing.Dict[str, typing.Any]] = {}

def reset_mock_db() -> None:
"""Resets the database to its initial state."""
global mock_db
now = datetime.now()
mock_db = {
"token_ok_1": {"expires_at": (now + timedelta(days=45)).timestamp(), "refreshed": False},
"token_expiring_soon": {"expires_at": (now + timedelta(days=15)).timestamp(), "refreshed": False},
# This token will fail on its first refresh attempt, then succeed.
"token_retry_success": {
"expires_at": (now + timedelta(days=20)).timestamp(),
"refreshed": False,
"attempts": 0,
},
"token_immediate_failure": {"expires_at": (now + timedelta(days=5)).timestamp(), "refreshed": False},
}

# --- Logging Setup ---
# Configure a logger for the mock server
log = logging.getLogger("MOCK_SERVER")
log.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
# Silence the default Werkzeug logger to use our own.
logging.getLogger('werkzeug').setLevel(logging.ERROR)


# --- Flask App ---
app = Flask(__name__)

@app.route("/api/v1/credentials", methods=["GET"])
def get_credentials() -> typing.Any:
"""Returns the current state of credentials."""
log.info("Request received for GET /api/v1/credentials")
return jsonify(mock_db)

@app.route("/api/v1/refresh-token", methods=["POST"])
def refresh_token() -> typing.Any:
"""Simulates the token refresh logic."""
data = request.get_json()
token = data.get("account_token")
log.info(f"Request received for POST /api/v1/refresh-token for token: {token}")

if not token or token not in mock_db:
return jsonify({"error": "Invalid Token"}), 404

# --- Simulation Logic ---
if token == "token_expiring_soon":
# On success, update the DB to show the token is no longer expiring.
mock_db[token]["expires_at"] = (datetime.now() + timedelta(days=60)).timestamp()
mock_db[token]["refreshed"] = True
log.info(f"Successfully refreshed '{token}'. It now expires in 60 days.")
return jsonify({"status": "refreshed"}), 200

if token == "token_retry_success":
mock_db[token]["attempts"] += 1
# Fail on the first attempt to simulate a transient network error.
if mock_db[token]["attempts"] <= 1:
log.info(f"Simulating retryable failure for '{token}'. Attempt #{mock_db[token]['attempts']}.")
return jsonify({"error": "Service Temporarily Unavailable"}), 503
else:
# Succeed on the second attempt.
mock_db[token]["expires_at"] = (datetime.now() + timedelta(days=60)).timestamp()
mock_db[token]["refreshed"] = True
log.info(f"Successfully refreshed '{token}' after {mock_db[token]['attempts']} attempts.")
return jsonify({"status": "refreshed"}), 200

if token == "token_immediate_failure":
return jsonify({"error": "Invalid Refresh Token"}), 401

# Default case for other tokens
return jsonify({"status": "no_action_needed"}), 200

# --- Main Execution ---
if __name__ == "__main__":
reset_mock_db()
log.info(f"Starting Mock Merge API Server on http://{HOST}:{PORT}")
# Use a production-ready server like Waitress for cleaner logs
app.run(host=HOST, port=PORT)
129 changes: 129 additions & 0 deletions e2e_test/sample_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# A self-contained script to demonstrate the Assurance Agent in an E2E test.

from __future__ import annotations

import logging
import os
import signal
import subprocess
import sys
import time
import typing

from merge.client import Merge
from merge.remediation.agent import AssuranceAgent

# --- App Configuration ---
MOCK_API_BASE_URL = "http://127.0.0.1:5002"
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()

# --- Global State ---
agent_handle: typing.Optional[AssuranceAgent] = None


# A custom formatter to add color to the log output for readability.
class ColoredFormatter(logging.Formatter):
"""A logging formatter that adds color to log messages."""

COLORS = {
"SUCCESS_CALLBACK": "\033[92m", # Green
"FAILURE_CALLBACK": "\033[91m", # Red
"ENDC": "\033[0m",
}

def format(self, record: logging.LogRecord) -> str:
log_message = super().format(record)
# For our specific callbacks, find and replace the keyword.
# This is a simple approach; a more robust one might inspect record attributes.
for key, color in self.COLORS.items():
if key in log_message:
log_message = log_message.replace(key, f"{color}{key}{self.COLORS['ENDC']}")
return log_message


# --- Logging Setup ---
def configure_logging() -> None:
"""Sets up readable, colored logging for the application."""
handler = logging.StreamHandler(sys.stdout)
# Use our new ColoredFormatter and a simpler log format
formatter = ColoredFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logging.basicConfig(level=LOG_LEVEL, handlers=[handler], force=True)
# Give httpx its own logger to reduce noise unless we're debugging
logging.getLogger("httpx").setLevel(logging.WARNING if LOG_LEVEL != "DEBUG" else logging.DEBUG)


# --- Merge Assurance Callbacks ---
def on_refresh_success(account_token: str) -> None:
"""Callback for when a token is successfully refreshed."""
# Log a plain string so the ColoredFormatter can process it.
logging.info(f"SUCCESS_CALLBACK: Token '{account_token}' was refreshed.")


def on_refresh_failure(account_token: str, error: Exception) -> None:
"""Callback for when a token refresh fails."""
# Log a plain string so the ColoredFormatter can process it.
logging.error(f"FAILURE_CALLBACK: Token '{account_token}' failed. Error: {error}")


# --- Main Execution ---
if __name__ == "__main__":
configure_logging()
server_process = None

# Use a context manager for the server process to ensure it's always cleaned up.
try:
logging.info("Starting mock API server...")
server_command = [sys.executable, "e2e_test/mock_server.py"]
server_process = subprocess.Popen(
server_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
preexec_fn=os.setsid
)
time.sleep(2) # Give the server a moment to start up

if server_process.poll() is not None:
raise RuntimeError("Mock server failed to start. Is the port already in use?")

# Initialize the Merge client
client = Merge(
api_key="YOUR_API_KEY",
account_token="YOUR_ACCOUNT_TOKEN",
base_url=MOCK_API_BASE_URL
)

agent_handle = client.remediation.enable_assurance(
on_success=on_refresh_success,
on_failure=on_refresh_failure,
check_interval_seconds=6, # Short interval for demo
expiry_threshold_days=30,
)

logging.info("--- DEMO START: Agent running for 30 seconds. ---")
time.sleep(30)
logging.info("--- DEMO END ---")

finally:
if agent_handle:
logging.info("Shutting down Assurance Agent...")
agent_handle.shutdown()

if server_process:
logging.info("Stopping mock API server...")
try:
os.killpg(os.getpgid(server_process.pid), signal.SIGTERM)
server_process.wait(timeout=5)
logging.info("Mock API server stopped.")
except (ProcessLookupError, OSError) as e:
logging.warning(f"Could not kill mock server, it may have already exited. Error: {e}")

stdout, stderr = server_process.communicate()
if stderr:
# Don't log expected 'Address already in use' as an error if we catch it.
if "Address already in use" not in stderr:
logging.error(f"Mock server stderr: {stderr.strip()}")


logging.info("Demonstration complete.")
Loading