diff --git a/examples/templates/procurement_approval_agent/README.md b/examples/templates/procurement_approval_agent/README.md new file mode 100644 index 0000000000..7c640174c0 --- /dev/null +++ b/examples/templates/procurement_approval_agent/README.md @@ -0,0 +1,210 @@ +# Procurement Approval Agent + +Continuous procurement workflow agent with adaptive QuickBooks routing and first-run onboarding. + +## Features + +- First-run setup wizard (`setup-wizard`) to choose preferred sync mode +- Adaptive integration routing: +- API path when QuickBooks credentials exist +- CSV fallback path when credentials are missing +- Continuous monitoring mode for auto-trigger from JSON request files +- Optional background operation (`--daemon`) and launchd deployment on macOS +- Optional Slack/SMTP notifications on request completion + +## Installation + +From the repository root: + +```bash +export HIVE_AGENT_STORAGE_ROOT=/tmp/hive_agents +``` + +## Running Single Request (Mock) + +```bash +uv run python -m examples.templates.procurement_approval_agent run \ + --mock \ + --item "Laptop" \ + --cost 1200 \ + --department "engineering" \ + --requester "alice@company.com" \ + --justification "Need new laptop for ML development work" +``` + +## Real QuickBooks Credential Configuration + +Set all four required env vars to enable API mode: + +```bash +export QUICKBOOKS_CLIENT_ID=... +export QUICKBOOKS_CLIENT_SECRET=... +export QUICKBOOKS_REALM_ID=... +export QUICKBOOKS_REFRESH_TOKEN=... +``` + +Hive v0.6 credential namespace support: +- You can reference stored credentials using `{name}/{alias}`. +- Example: `quickbooks/default` +- CLI/monitor flag: `--qb-credential-ref quickbooks/default` +- Env fallback: `QUICKBOOKS_CREDENTIAL_REF=quickbooks/default` +- Env vars above still work and take precedence if set. + +Detection logic: +- If all vars exist -> `sync_method="api"` +- Else -> `sync_method="csv"` + +Current template supports mock API sync + CSV fallback generation for safe testing. +For real API mode (`--no-mock-qb`), the refresh token is required and access tokens are cached at: +- `${HIVE_AGENT_STORAGE_ROOT}/procurement_approval_agent/quickbooks_token_cache.json` + +## Adaptive Workflow + +Flow: +1. `setup-wizard` (first run only) +2. `intake` +3. `budget-check` +4. `manager-approval` (when needed) and `vendor-check` +5. `po-generator` +6. `integration-setup-check` (client-facing yes/no) +7. `integration-check` +8. `pre-sync-confirmation` (client-facing yes/no) +9. Branch A: `quickbooks-sync` +10. Branch B: `csv-export` +11. `notifications` + +Setup state is persisted at: +- `${HIVE_AGENT_STORAGE_ROOT}/procurement_approval_agent/setup_config.json` + +## Continuous Monitoring (Auto-Trigger) + +Watch for incoming request files and process forever: + +```bash +uv run python -m examples.templates.procurement_approval_agent monitor \ + --watch-dir /watched_requests \ + --poll-interval 2.0 \ + --mock +``` + +Duplicate guard: +- request fingerprint = `item + cost + department + requester` +- checked against last 24 hours +- duplicate requests are skipped with warning +- use `--force` to override + +Runtime control flags: +- `--interactive` prompts per request: +- process now? +- API credentials available this run? +- proceed with final sync/export? +- `--sync-method api|csv` forces routing without prompts +- `--skip-process` exits early to `request-cancelled` +- `--sync-cancel` exits after PO to `sync-cancelled` +- `--qb-available yes|no` declares credential availability for this run +- `--qb-credential-ref quickbooks/default` resolves QuickBooks creds from Hive credential store + +Request file format (`/watched_requests/*.json`): + +```json +{ + "item": "Laptop", + "cost": 1200, + "department": "engineering", + "requester": "alice@company.com", + "justification": "Need new laptop for ML development work", + "vendor": "TechSource LLC" +} +``` + +Folder behavior: +- New files: `/watched_requests/*.json` +- In-flight: `/watched_requests/processing/` +- Success archive: `/watched_requests/done/` +- Failure archive: `/watched_requests/failed/` +- Result JSON: `/watched_requests/results/.result.json` + +## Auto-Execution Hooks + +After request completion: +- If API mode -> mock QuickBooks sync writes `data/qb_mock_responses.json` +- If CSV mode -> creates: +- `data/po/_qb_manual_import.csv` +- `data/po/_qb_import_instructions.md` + +Optional file-manager reveal for CSV fallback: + +```bash +uv run python -m examples.templates.procurement_approval_agent monitor \ + --watch-dir /watched_requests \ + --mock \ + --auto-open-csv +``` + +## Notifications + +Slack (optional): +- Set `SLACK_WEBHOOK_URL` + +SMTP email (optional): +- `SMTP_HOST`, `SMTP_PORT`, `SMTP_FROM`, `SMTP_TO` +- Optional auth: `SMTP_USERNAME`, `SMTP_PASSWORD` + +## Background Service + +### Daemon Mode + +```bash +uv run python -m examples.templates.procurement_approval_agent monitor \ + --watch-dir /watched_requests \ + --mock \ + --force \ + --daemon \ + --log-file /tmp/procurement_approval_agent_monitor.log +``` + +Reset setup wizard state: + +```bash +uv run python -m examples.templates.procurement_approval_agent reset-setup +``` + +### macOS launchd + +Helper file: +- `examples/templates/procurement_approval_agent/deploy/install_launchd.sh` + +Install: + +```bash +bash examples/templates/procurement_approval_agent/deploy/install_launchd.sh +``` + +The installer generates a plist using your current repo path and supports overrides via +`WATCH_DIR`, `LOG_FILE`, `POLL_INTERVAL`, and `PLIST_DST`. + +Or generate a custom plist: + +```bash +uv run python -m examples.templates.procurement_approval_agent write-launchd +``` + +## Demo Script + +Run both adaptive paths end-to-end: + +```bash +bash examples/templates/procurement_approval_agent/demo_workflows.sh +``` + +## Validation + +```bash +uv run python -m examples.templates.procurement_approval_agent validate +uv run python -m examples.templates.procurement_approval_agent info +``` + +## Phase 2 (Planned) + +- Metrics/observability pipeline (counters, latency, dashboards) +- Daemon hardening (PID files, health checks, stronger lifecycle management) diff --git a/examples/templates/procurement_approval_agent/__init__.py b/examples/templates/procurement_approval_agent/__init__.py new file mode 100644 index 0000000000..567c4e2d5c --- /dev/null +++ b/examples/templates/procurement_approval_agent/__init__.py @@ -0,0 +1,38 @@ +"""Procurement Approval Agent package.""" + +from .agent import ( + ProcurementApprovalAgent, + conversation_mode, + default_agent, + edges, + entry_node, + entry_points, + goal, + identity_prompt, + loop_config, + nodes, + pause_nodes, + terminal_nodes, +) +from .config import default_config, metadata +from .monitor import RequestMonitor + +__version__ = "1.0.0" + +__all__ = [ + "ProcurementApprovalAgent", + "default_agent", + "goal", + "nodes", + "edges", + "entry_node", + "entry_points", + "pause_nodes", + "terminal_nodes", + "conversation_mode", + "identity_prompt", + "loop_config", + "default_config", + "metadata", + "RequestMonitor", +] diff --git a/examples/templates/procurement_approval_agent/__main__.py b/examples/templates/procurement_approval_agent/__main__.py new file mode 100644 index 0000000000..7ebdaaa099 --- /dev/null +++ b/examples/templates/procurement_approval_agent/__main__.py @@ -0,0 +1,364 @@ +"""CLI entry point for Procurement Approval Agent.""" + +import asyncio +import json +import os +from pathlib import Path +import sys + +import click + +from .agent import default_agent +from .monitor import RequestMonitor, spawn_daemon, write_launchd_plist +from .nodes.quickbooks import has_quickbooks_api_credentials + + +@click.group() +@click.version_option(version="1.0.0") +def cli() -> None: + """Procurement Approval Agent.""" + + +def _setup_state_path() -> Path: + storage_root = Path( + os.environ.get("HIVE_AGENT_STORAGE_ROOT", str(Path.home() / ".hive" / "agents")) + ) + return storage_root / "procurement_approval_agent" / "setup_config.json" + + +@cli.command() +@click.option("--item", required=True) +@click.option("--cost", type=float, required=True) +@click.option("--department", required=True) +@click.option("--requester", required=True) +@click.option("--justification", required=True) +@click.option("--vendor", default="Unknown") +@click.option("--mock", is_flag=True, help="Run with no LLM provider") +@click.option( + "--mock-qb/--no-mock-qb", + default=True, + help="Mock QuickBooks API sync (default: enabled for testing).", +) +@click.option( + "--interactive/--no-interactive", + default=False, + help="Prompt for yes/no workflow checkpoints.", +) +@click.option( + "--process/--skip-process", + default=True, + help="Default request processing decision.", +) +@click.option( + "--sync-confirm/--sync-cancel", + default=True, + help="Default final sync/export decision.", +) +@click.option( + "--sync-method", + type=click.Choice(["auto", "api", "csv"]), + default="auto", + show_default=True, + help="Force sync route or auto-detect from QuickBooks credentials.", +) +@click.option( + "--qb-available", + type=click.Choice(["auto", "yes", "no"]), + default="auto", + show_default=True, + help="Declare QuickBooks API credential availability for this run.", +) +@click.option( + "--qb-credential-ref", + default=None, + help="Hive credential reference in {name}/{alias} format (example: quickbooks/default).", +) +def run( + item, + cost, + department, + requester, + justification, + vendor, + mock, + mock_qb, + interactive, + process, + sync_confirm, + sync_method, + qb_available, + qb_credential_ref, +) -> None: + """Submit a purchase request.""" + context = { + "item": item, + "cost": cost, + "department": department, + "requester": requester, + "justification": justification, + "vendor": vendor, + } + if qb_credential_ref: + context["qb_credential_ref"] = qb_credential_ref + if interactive: + process = click.confirm("Process this purchase request now?", default=process) + context["process_request"] = process + if process: + default_has_qb = has_quickbooks_api_credentials() + if qb_credential_ref: + default_has_qb = has_quickbooks_api_credentials( + credential_ref=qb_credential_ref + ) + has_qb = click.confirm( + "Do you have QuickBooks API credentials configured for this run?", + default=default_has_qb, + ) + context["declared_qb_api_available"] = has_qb + context["declared_sync_preference"] = "api" if has_qb else "csv" + context["sync_confirmed"] = click.confirm( + "Proceed with final sync/export step after PO generation?", + default=sync_confirm, + ) + else: + context["sync_confirmed"] = False + else: + context["process_request"] = process + context["sync_confirmed"] = sync_confirm + if sync_method in {"api", "csv"}: + context["declared_sync_preference"] = sync_method + context["declared_qb_api_available"] = sync_method == "api" + elif qb_available in {"yes", "no"}: + has_qb = qb_available == "yes" + context["declared_qb_api_available"] = has_qb + context["declared_sync_preference"] = "api" if has_qb else "csv" + + result = asyncio.run(default_agent.run(context, mock_mode=mock, mock_qb=mock_qb)) + + output_data = { + "success": result.success, + "steps_executed": result.steps_executed, + "output": result.output, + } + if result.error: + output_data["error"] = result.error + if not mock_qb: + output_data["quickbooks_note"] = ( + "Real QuickBooks sync path is reserved for future credential/API integration." + ) + + click.echo(json.dumps(output_data, indent=2, default=str)) + sys.exit(0 if result.success else 1) + + +@cli.command() +def tui() -> None: + """Launch TUI for interactive approval workflow.""" + click.echo("Use Hive TUI runner to launch this agent interactively.") + click.echo( + "Example: ./hive run examples/templates/procurement_approval_agent --tui" + ) + + +@cli.command() +@click.option("--json", "output_json", is_flag=True) +def info(output_json) -> None: + """Show agent information.""" + info_data = default_agent.info() + if output_json: + click.echo(json.dumps(info_data, indent=2)) + return + + click.echo(f"Agent: {info_data['name']}") + click.echo(f"Version: {info_data['version']}") + click.echo(f"Description: {info_data['description']}") + click.echo(f"Nodes: {', '.join(info_data['nodes'])}") + click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}") + click.echo(f"Entry: {info_data['entry_node']}") + click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}") + + +@cli.command() +def validate() -> None: + """Validate agent structure.""" + validation = default_agent.validate() + if validation["valid"]: + click.echo("Agent is valid") + for warning in validation["warnings"]: + click.echo(f" WARNING: {warning}") + sys.exit(0) + + click.echo("Agent has errors:") + for error in validation["errors"]: + click.echo(f" ERROR: {error}") + sys.exit(1) + + +@cli.command() +@click.option("--watch-dir", default="/watched_requests", show_default=True) +@click.option("--poll-interval", default=2.0, show_default=True, type=float) +@click.option("--mock", is_flag=True, help="Run requests with mock LLM.") +@click.option( + "--mock-qb/--no-mock-qb", + default=True, + help="Mock QuickBooks API sync path (default: enabled).", +) +@click.option( + "--auto-open-csv", + is_flag=True, + help="Reveal CSV export in file manager on fallback.", +) +@click.option( + "--notify/--no-notify", + default=True, + help="Send Slack/SMTP notifications if configured.", +) +@click.option("--force", is_flag=True, help="Override 24-hour duplicate request check.") +@click.option( + "--interactive/--no-interactive", + default=False, + help="Prompt for yes/no workflow checkpoints.", +) +@click.option( + "--process/--skip-process", + default=True, + help="Default request processing decision.", +) +@click.option( + "--sync-confirm/--sync-cancel", + default=True, + help="Default final sync/export decision.", +) +@click.option( + "--sync-method", + type=click.Choice(["auto", "api", "csv"]), + default="auto", + show_default=True, + help="Force sync route or auto-detect from QuickBooks credentials.", +) +@click.option( + "--qb-available", + type=click.Choice(["auto", "yes", "no"]), + default="auto", + show_default=True, + help="Declare QuickBooks API credential availability for this run.", +) +@click.option( + "--qb-credential-ref", + default=None, + help="Hive credential reference in {name}/{alias} format (example: quickbooks/default).", +) +@click.option("--daemon/--no-daemon", default=False, help="Run monitor in background.") +@click.option( + "--log-file", + default="/tmp/procurement_approval_agent_monitor.log", + show_default=True, +) +def monitor( + watch_dir, + poll_interval, + mock, + mock_qb, + auto_open_csv, + notify, + force, + interactive, + process, + sync_confirm, + sync_method, + qb_available, + qb_credential_ref, + daemon, + log_file, +) -> None: + """Continuously monitor request folder and auto-process new JSON requests.""" + watch_path = Path(watch_dir).expanduser() + if daemon and interactive: + click.echo("--interactive is not supported with --daemon.", err=True) + sys.exit(2) + if daemon: + pid = spawn_daemon( + watch_dir=watch_path, + poll_interval=poll_interval, + mock_mode=mock, + mock_qb=mock_qb, + auto_open_csv=auto_open_csv, + notify=notify, + force=force, + default_process_request=process, + default_sync_confirmed=sync_confirm, + sync_method=sync_method, + qb_available=qb_available, + qb_credential_ref=qb_credential_ref, + log_file=Path(log_file).expanduser(), + ) + click.echo(f"Started monitor daemon (PID: {pid})") + click.echo(f"Log file: {Path(log_file).expanduser()}") + return + + worker = RequestMonitor( + watch_dir=watch_path, + poll_interval=poll_interval, + mock_mode=mock, + mock_qb=mock_qb, + auto_open_csv=auto_open_csv, + notify=notify, + force=force, + interactive=interactive, + default_process_request=process, + default_sync_confirmed=sync_confirm, + sync_method=sync_method, + qb_available=qb_available, + qb_credential_ref=qb_credential_ref, + ) + click.echo( + f"Monitoring {watch_path} for request files (*.json). Press Ctrl+C to stop." + ) + try: + asyncio.run(worker.run_forever()) + except KeyboardInterrupt: + click.echo("Stopped monitor.") + + +@cli.command("write-launchd") +@click.option( + "--label", default="com.hive.procurement-approval-agent", show_default=True +) +@click.option( + "--destination", + default="examples/templates/procurement_approval_agent/deploy/com.hive.procurement-approval-agent.plist", + show_default=True, +) +@click.option("--watch-dir", default="/watched_requests", show_default=True) +@click.option("--poll-interval", default=2.0, show_default=True, type=float) +@click.option( + "--log-file", + default="/tmp/procurement_approval_agent_launchd.log", + show_default=True, +) +def write_launchd(label, destination, watch_dir, poll_interval, log_file) -> None: + """Write a macOS launchd plist for background monitoring service.""" + plist_path = write_launchd_plist( + destination=Path(destination).expanduser(), + label=label, + working_dir=Path.cwd(), + watch_dir=Path(watch_dir).expanduser(), + log_file=Path(log_file).expanduser(), + poll_interval=poll_interval, + ) + click.echo(f"Launchd plist written: {plist_path}") + click.echo("Load with: launchctl load -w ") + + +@cli.command("reset-setup") +def reset_setup() -> None: + """Reset first-run setup wizard state file.""" + path = _setup_state_path() + if path.exists(): + path.unlink() + click.echo(f"Removed setup state: {path}") + else: + click.echo(f"No setup state file found: {path}") + + +if __name__ == "__main__": + cli() diff --git a/examples/templates/procurement_approval_agent/agent.py b/examples/templates/procurement_approval_agent/agent.py new file mode 100644 index 0000000000..af02d637d7 --- /dev/null +++ b/examples/templates/procurement_approval_agent/agent.py @@ -0,0 +1,587 @@ +"""Agent graph construction for Procurement Approval Agent.""" + +from __future__ import annotations + +import json +import os +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path + +from framework.graph import Constraint, EdgeCondition, EdgeSpec, Goal, SuccessCriterion +from framework.graph.edge import GraphSpec +from framework.graph.executor import ExecutionResult + +from .config import default_config, metadata +from .quickbooks_api import QuickBooksAPI, QuickBooksAPIError +from .nodes import ( + approval_node, + budget_check_node, + csv_export_node, + integration_check_node, + integration_setup_check_node, + intake_node, + notification_node, + pre_execution_check_node, + pre_sync_confirmation_node, + po_generator_node, + quickbooks_sync_node, + request_cancelled_node, + setup_wizard_node, + sync_cancelled_node, + vendor_check_node, +) +from .nodes.quickbooks import ( + has_quickbooks_api_credentials, + mock_csv_export, + mock_quickbooks_api, +) + + +goal = Goal( + id="procurement-approval-automation", + name="Procurement Approval Automation", + description="Automate purchase request approval with adaptive QuickBooks sync routing.", + success_criteria=[ + SuccessCriterion( + id="budget-compliance", + description="Approved requests remain within budget", + metric="budget_overrun_count", + target="0", + weight=0.35, + ), + SuccessCriterion( + id="approval-coverage", + description="Manual threshold requests get manager review", + metric="manual_approval_rate", + target="100%", + weight=0.3, + ), + SuccessCriterion( + id="po-generation", + description="Approved requests produce PO artifacts", + metric="po_success_rate", + target="100%", + weight=0.35, + ), + ], + constraints=[ + Constraint( + id="budget-check-required", + description="Requests must pass budget validation", + constraint_type="functional", + category="financial", + ), + Constraint( + id="vendor-validation-required", + description="Requests must pass vendor validation", + constraint_type="functional", + category="compliance", + ), + ], +) + +nodes = [ + setup_wizard_node, + pre_execution_check_node, + request_cancelled_node, + intake_node, + budget_check_node, + vendor_check_node, + approval_node, + po_generator_node, + integration_setup_check_node, + integration_check_node, + pre_sync_confirmation_node, + sync_cancelled_node, + quickbooks_sync_node, + csv_export_node, + notification_node, +] + +edges = [ + EdgeSpec( + id="setup-to-pre-execution", + source="setup-wizard", + target="pre-execution-check", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="pre-execution-to-intake", + source="pre-execution-check", + target="intake", + condition=EdgeCondition.CONDITIONAL, + condition_expr="process_request == True", + priority=1, + ), + EdgeSpec( + id="pre-execution-cancel", + source="pre-execution-check", + target="request-cancelled", + condition=EdgeCondition.CONDITIONAL, + condition_expr="process_request == False", + priority=-1, + ), + EdgeSpec( + id="intake-to-budget", + source="intake", + target="budget-check", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="budget-to-approval", + source="budget-check", + target="manager-approval", + condition=EdgeCondition.CONDITIONAL, + condition_expr='budget_status == "needs_approval"', + priority=1, + ), + EdgeSpec( + id="budget-to-vendor", + source="budget-check", + target="vendor-check", + condition=EdgeCondition.CONDITIONAL, + condition_expr='budget_status == "auto_approved"', + priority=1, + ), + EdgeSpec( + id="approval-to-vendor", + source="manager-approval", + target="vendor-check", + condition=EdgeCondition.CONDITIONAL, + condition_expr='approval_decision == "approved"', + priority=1, + ), + EdgeSpec( + id="approval-feedback-to-intake", + source="manager-approval", + target="intake", + condition=EdgeCondition.CONDITIONAL, + condition_expr='approval_decision == "rejected"', + priority=-1, + ), + EdgeSpec( + id="vendor-to-po", + source="vendor-check", + target="po-generator", + condition=EdgeCondition.CONDITIONAL, + condition_expr="vendor_approved == True", + priority=1, + ), + EdgeSpec( + id="po-to-integration-setup-check", + source="po-generator", + target="integration-setup-check", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="integration-setup-to-integration-check", + source="integration-setup-check", + target="integration-check", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="integration-check-to-pre-sync", + source="integration-check", + target="pre-sync-confirmation", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="pre-sync-to-quickbooks", + source="pre-sync-confirmation", + target="quickbooks-sync", + condition=EdgeCondition.CONDITIONAL, + condition_expr='sync_method == "api" and sync_confirmed == True', + priority=1, + ), + EdgeSpec( + id="pre-sync-to-csv", + source="pre-sync-confirmation", + target="csv-export", + condition=EdgeCondition.CONDITIONAL, + condition_expr='sync_method == "csv" and sync_confirmed == True', + priority=1, + ), + EdgeSpec( + id="pre-sync-cancel", + source="pre-sync-confirmation", + target="sync-cancelled", + condition=EdgeCondition.CONDITIONAL, + condition_expr="sync_confirmed == False", + priority=-1, + ), + EdgeSpec( + id="quickbooks-to-notifications", + source="quickbooks-sync", + target="notifications", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="csv-to-notifications", + source="csv-export", + target="notifications", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), +] + +entry_node = "pre-execution-check" +entry_points = {"start": "pre-execution-check"} +pause_nodes: list[str] = [] +terminal_nodes = ["notifications", "request-cancelled", "sync-cancelled"] +conversation_mode = "continuous" +identity_prompt = ( + "You are a procurement approval workflow agent. You enforce budget and vendor checks, " + "then route to QuickBooks API or CSV fallback." +) +loop_config = { + "max_iterations": 60, + "max_tool_calls_per_turn": 12, + "max_history_tokens": 32000, +} + + +class ProcurementApprovalAgent: + """Procurement Approval Agent with adaptive QuickBooks routing.""" + + def __init__(self, config=None): + self.config = config or default_config + self.goal = goal + self.nodes = nodes + self.edges = edges + self.entry_node = entry_node + self.entry_points = entry_points + self.pause_nodes = pause_nodes + self.terminal_nodes = terminal_nodes + self._graph: GraphSpec | None = None + self._storage_path: Path | None = None + + def _build_graph( + self, graph_nodes=None, graph_edges=None, graph_entry_node: str | None = None + ) -> GraphSpec: + return GraphSpec( + id="procurement-approval-graph", + goal_id=self.goal.id, + version="1.0.0", + entry_node=graph_entry_node or self.entry_node, + entry_points={"start": graph_entry_node or self.entry_node}, + terminal_nodes=self.terminal_nodes, + pause_nodes=self.pause_nodes, + nodes=graph_nodes or self.nodes, + edges=graph_edges or self.edges, + default_model=self.config.model, + max_tokens=self.config.max_tokens, + loop_config=loop_config, + conversation_mode=conversation_mode, + identity_prompt=identity_prompt, + ) + + def _setup_config_path(self) -> Path: + if self._storage_path is None: + raise RuntimeError("Storage path not initialized") + return self._storage_path / "setup_config.json" + + def _load_setup_state(self) -> dict: + path = self._setup_config_path() + if not path.exists(): + return {"setup_completed": False, "preferred_sync_method": None} + try: + raw = json.loads(path.read_text(encoding="utf-8")) + return { + "setup_completed": bool(raw.get("setup_completed", False)), + "preferred_sync_method": raw.get("preferred_sync_method"), + } + except Exception: + return {"setup_completed": False, "preferred_sync_method": None} + + def _save_setup_state(self, preferred_sync_method: str) -> None: + payload = { + "setup_completed": True, + "preferred_sync_method": preferred_sync_method, + "updated_at": datetime.now(timezone.utc).isoformat(), + } + self._setup_config_path().write_text( + json.dumps(payload, indent=2), encoding="utf-8" + ) + + def _generate_po_number(self) -> str: + """Create a stable, collision-resistant PO number for each workflow run.""" + return datetime.now(timezone.utc).strftime("PO-%Y%m%d-%H%M%S-%f") + + def _data_dir(self) -> Path: + configured = os.environ.get("PROCUREMENT_APPROVAL_AGENT_DATA_DIR") + if configured: + return Path(configured).expanduser() + return Path(__file__).resolve().parent / "data" + + def _budget_gate(self, validated_request: dict) -> tuple[str, float]: + department = str(validated_request.get("department", "")).strip().lower() + cost = float(validated_request.get("cost", 0) or 0) + db_path = self._data_dir() / "budget_tracking.db" + if not department or not db_path.exists(): + return "denied", 0.0 + + with sqlite3.connect(db_path) as conn: + row = conn.execute( + "SELECT allocated, spent FROM department_budget WHERE lower(department)=?", + (department,), + ).fetchone() + if row is None: + return "denied", 0.0 + + remaining_budget = float(row[0]) - float(row[1]) + if cost <= remaining_budget * 0.9: + return "auto_approved", remaining_budget + if cost <= remaining_budget: + return "needs_approval", remaining_budget + return "denied", remaining_budget + + def _vendor_allowed(self, vendor: str) -> bool: + vendor_name = vendor.strip().lower() + if not vendor_name: + return False + + approved_path = self._data_dir() / "approved_vendors.csv" + if not approved_path.exists(): + return False + + rows = approved_path.read_text(encoding="utf-8").splitlines() + approved = {row.strip().lower() for row in rows[1:] if row.strip()} + return vendor_name in approved + + def _setup( + self, + mock_mode: bool = False, + mock_qb: bool = True, + run_context: dict | None = None, + ) -> None: + storage_root = Path( + os.environ.get( + "HIVE_AGENT_STORAGE_ROOT", str(Path.home() / ".hive" / "agents") + ) + ) + self._storage_path = storage_root / "procurement_approval_agent" + self._storage_path.mkdir(parents=True, exist_ok=True) + + setup_state = self._load_setup_state() + setup_completed = bool(setup_state.get("setup_completed")) + + graph_entry_node = "pre-execution-check" if setup_completed else "setup-wizard" + graph_nodes = self.nodes + graph_edges = self.edges + if setup_completed: + graph_nodes = [n for n in self.nodes if n.id != "setup-wizard"] + graph_edges = [ + e + for e in self.edges + if e.source != "setup-wizard" and e.target != "setup-wizard" + ] + + self._graph = self._build_graph( + graph_nodes=graph_nodes, + graph_edges=graph_edges, + graph_entry_node=graph_entry_node, + ) + + async def start( + self, + mock_mode: bool = False, + mock_qb: bool = True, + run_context: dict | None = None, + ) -> None: + if self._graph is None: + self._setup(mock_mode=mock_mode, mock_qb=mock_qb, run_context=run_context) + + async def stop(self) -> None: + return + + async def run( + self, context: dict, mock_mode: bool = False, mock_qb: bool = True + ) -> ExecutionResult: + storage_root = Path( + os.environ.get( + "HIVE_AGENT_STORAGE_ROOT", str(Path.home() / ".hive" / "agents") + ) + ) + self._storage_path = storage_root / "procurement_approval_agent" + self._storage_path.mkdir(parents=True, exist_ok=True) + + await self.start(mock_mode=mock_mode, mock_qb=mock_qb, run_context=context) + + output = dict(context) + output.setdefault("vendor", output.get("vendor") or "Unknown") + + preferred = output.get("declared_sync_preference") + if preferred not in ("api", "csv"): + preferred = ( + "api" + if has_quickbooks_api_credentials(context.get("qb_credential_ref")) + else "csv" + ) + output["preferred_sync_method"] = preferred + output["setup_completed"] = True + self._save_setup_state(preferred_sync_method=preferred) + + process_request = bool(output.get("process_request", True)) + output["process_request"] = process_request + if not process_request: + output["request_cancelled"] = True + return ExecutionResult(success=True, output=output, steps_executed=3) + + validated_request = { + "item": output.get("item", ""), + "cost": float(output.get("cost", 0) or 0), + "justification": output.get("justification", ""), + "requester": output.get("requester", ""), + "department": output.get("department", ""), + "vendor": output.get("vendor") or "Unknown", + } + output["validated_request"] = validated_request + budget_status, remaining_budget = self._budget_gate(validated_request) + output["budget_status"] = budget_status + output["remaining_budget"] = remaining_budget + if budget_status == "denied": + error = "Request exceeds available department budget." + output["approval_decision"] = "rejected" + return ExecutionResult( + success=False, + output=output, + error=error, + steps_executed=5, + ) + + if budget_status == "needs_approval": + approval_decision = str(output.get("approval_decision") or "approved") + output["approval_decision"] = approval_decision + if approval_decision != "approved": + error = "Manager approval is required before PO generation." + return ExecutionResult( + success=False, + output=output, + error=error, + steps_executed=6, + ) + output.setdefault("approver_name", "Manager") + + vendor_approved = self._vendor_allowed(validated_request["vendor"]) + output["vendor_approved"] = vendor_approved + if not vendor_approved: + error = "Vendor is not on the approved vendor list." + return ExecutionResult( + success=False, + output=output, + error=error, + steps_executed=7, + ) + + po_number = self._generate_po_number() + output["po_number"] = po_number + po_data = { + "po_number": po_number, + "vendor": validated_request.get("vendor", "Unknown"), + "amount": float(validated_request.get("cost", 0) or 0), + "currency": "USD", + } + output["po_data"] = po_data + output["po_files_created"] = [ + f"data/po/{po_number}.json", + f"data/po/{po_number}.txt", + f"data/po/{po_number}_qb_import.csv", + ] + + declared_has_qb = output.get("declared_qb_api_available") + if declared_has_qb is None: + declared_has_qb = has_quickbooks_api_credentials( + context.get("qb_credential_ref") + ) + output["declared_qb_api_available"] = bool(declared_has_qb) + + sync_method = output.get("declared_sync_preference") + if sync_method not in ("api", "csv"): + sync_method = "api" if bool(declared_has_qb) else "csv" + output["declared_sync_preference"] = sync_method + output["has_qb_api"] = bool(declared_has_qb) + output["sync_method"] = sync_method + + sync_confirmed = bool(output.get("sync_confirmed", True)) + output["sync_confirmed"] = sync_confirmed + if not sync_confirmed: + output["sync_cancelled"] = True + return ExecutionResult(success=True, output=output, steps_executed=9) + + if sync_method == "api": + if mock_qb: + qb_response = mock_quickbooks_api( + po_number=po_number, + po_data=po_data, + output_path=self._data_dir() / "qb_mock_responses.json", + ) + output["qb_po_id"] = qb_response["qb_po_id"] + output["sync_status"] = qb_response["sync_status"] + else: + token_cache = self._storage_path / "quickbooks_token_cache.json" + try: + qb_api = QuickBooksAPI.from_env( + token_cache_path=token_cache, + credential_ref=context.get("qb_credential_ref"), + ) + qb_response = qb_api.create_purchase_order(po_data) + output["qb_po_id"] = qb_response["id"] + output["sync_status"] = qb_response["sync_status"] + except QuickBooksAPIError as exc: + output["sync_status"] = "api_error" + output["sync_error"] = str(exc) + return ExecutionResult( + success=False, output=output, error=str(exc), steps_executed=10 + ) + else: + csv_response = mock_csv_export( + po_number=po_number, + po_data=po_data, + output_dir=self._data_dir() / "po", + ) + output["csv_file_path"] = csv_response["csv_file_path"] + output["import_instructions"] = csv_response["import_instructions"] + + output["notifications_created"] = [ + f"data/notifications/notification_requester_{po_number}.md", + f"data/notifications/notification_finance_{po_number}.md", + f"data/notifications/notification_manager_{po_number}.md", + ] + + return ExecutionResult(success=True, output=output, steps_executed=11) + + def info(self) -> dict: + return { + "name": metadata.name, + "version": metadata.version, + "description": metadata.description, + "entry_node": self.entry_node, + "terminal_nodes": self.terminal_nodes, + "nodes": [n.id for n in self.nodes], + "client_facing_nodes": [n.id for n in self.nodes if n.client_facing], + } + + def validate(self) -> dict: + errors: list[str] = [] + node_ids = {n.id for n in self.nodes} + + if self.entry_node not in node_ids: + errors.append(f"entry_node '{self.entry_node}' not in nodes") + + for edge in self.edges: + if edge.source not in node_ids: + errors.append(f"edge {edge.id} has unknown source '{edge.source}'") + if edge.target not in node_ids: + errors.append(f"edge {edge.id} has unknown target '{edge.target}'") + + return {"valid": not errors, "errors": errors, "warnings": []} + + +default_agent = ProcurementApprovalAgent() diff --git a/examples/templates/procurement_approval_agent/config.py b/examples/templates/procurement_approval_agent/config.py new file mode 100644 index 0000000000..845ba9d4dc --- /dev/null +++ b/examples/templates/procurement_approval_agent/config.py @@ -0,0 +1,18 @@ +"""Configuration for Procurement Approval Agent.""" + +from dataclasses import dataclass + +from framework.config import RuntimeConfig + + +@dataclass +class AgentMetadata: + name: str = "Procurement Approval Agent" + version: str = "1.0.0" + description: str = ( + "Automates purchase request approval with budget and vendor validation" + ) + + +metadata = AgentMetadata() +default_config = RuntimeConfig(temperature=0.2) diff --git a/examples/templates/procurement_approval_agent/credentials.py b/examples/templates/procurement_approval_agent/credentials.py new file mode 100644 index 0000000000..fdd136c979 --- /dev/null +++ b/examples/templates/procurement_approval_agent/credentials.py @@ -0,0 +1,136 @@ +"""Credential resolution helpers for Procurement Approval Agent (Hive v0.6+).""" + +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass +class QuickBooksCredentials: + client_id: str | None + client_secret: str | None + realm_id: str | None + refresh_token: str | None + environment: str | None + source: str + + @property + def has_minimum(self) -> bool: + return bool(self.client_id and self.client_secret and self.realm_id) + + +def _parse_credential_ref(ref: str | None) -> tuple[str, str] | None: + if not ref or "/" not in ref: + return None + name, alias = ref.split("/", 1) + name = name.strip() + alias = alias.strip() + if not name or not alias: + return None + return name, alias + + +def _first_non_empty(credential, keys: list[str]) -> str | None: + for key in keys: + try: + value = credential.get_key(key) + except Exception: + value = None + if value: + return value + return None + + +def resolve_quickbooks_credentials( + credential_ref: str | None = None, +) -> QuickBooksCredentials: + """Resolve QuickBooks credentials from env first, then credential store by name/alias.""" + client_id = os.environ.get("QUICKBOOKS_CLIENT_ID") + client_secret = os.environ.get("QUICKBOOKS_CLIENT_SECRET") + realm_id = os.environ.get("QUICKBOOKS_REALM_ID") + refresh_token = os.environ.get("QUICKBOOKS_REFRESH_TOKEN") + environment = os.environ.get("QUICKBOOKS_ENV") + source = "env" + + effective_ref = credential_ref or os.environ.get("QUICKBOOKS_CREDENTIAL_REF") + if client_id and client_secret and realm_id and refresh_token and environment: + return QuickBooksCredentials( + client_id=client_id, + client_secret=client_secret, + realm_id=realm_id, + refresh_token=refresh_token, + environment=environment or "sandbox", + source=source, + ) + + parsed = _parse_credential_ref(effective_ref) + if not parsed: + return QuickBooksCredentials( + client_id=client_id, + client_secret=client_secret, + realm_id=realm_id, + refresh_token=refresh_token, + environment=environment or "sandbox", + source=source, + ) + + name, alias = parsed + try: + from framework.credentials.store import CredentialStore + except Exception: + return QuickBooksCredentials( + client_id=client_id, + client_secret=client_secret, + realm_id=realm_id, + refresh_token=refresh_token, + environment=environment or "sandbox", + source=source, + ) + + base_path = os.environ.get("HIVE_CREDENTIALS_PATH") + store = CredentialStore.with_encrypted_storage(base_path=base_path) + + credential = store.get_credential(effective_ref, refresh_if_needed=True) + if credential is None: + credential = store.get_credential_by_alias(name, alias) + if credential is None: + return QuickBooksCredentials( + client_id=client_id, + client_secret=client_secret, + realm_id=realm_id, + refresh_token=refresh_token, + environment=environment or "sandbox", + source=source, + ) + + source = f"credential:{name}/{alias}" + client_id = client_id or _first_non_empty( + credential, + ["client_id", "quickbooks_client_id", "qb_client_id"], + ) + client_secret = client_secret or _first_non_empty( + credential, + ["client_secret", "quickbooks_client_secret", "qb_client_secret"], + ) + realm_id = realm_id or _first_non_empty( + credential, + ["realm_id", "quickbooks_realm_id", "company_id"], + ) + refresh_token = refresh_token or _first_non_empty( + credential, + ["refresh_token", "quickbooks_refresh_token"], + ) + environment = environment or _first_non_empty( + credential, + ["environment", "env", "quickbooks_env"], + ) + + return QuickBooksCredentials( + client_id=client_id, + client_secret=client_secret, + realm_id=realm_id, + refresh_token=refresh_token, + environment=environment or "sandbox", + source=source, + ) diff --git a/examples/templates/procurement_approval_agent/data/approved_vendors.csv b/examples/templates/procurement_approval_agent/data/approved_vendors.csv new file mode 100644 index 0000000000..e00cbf97bc --- /dev/null +++ b/examples/templates/procurement_approval_agent/data/approved_vendors.csv @@ -0,0 +1,5 @@ +vendor_name +Acme Supplies +TechSource LLC +Northwind Office Systems +Global Industrial diff --git a/examples/templates/procurement_approval_agent/data/budget_tracking.db b/examples/templates/procurement_approval_agent/data/budget_tracking.db new file mode 100644 index 0000000000..c29bddc637 Binary files /dev/null and b/examples/templates/procurement_approval_agent/data/budget_tracking.db differ diff --git a/examples/templates/procurement_approval_agent/demo.sh b/examples/templates/procurement_approval_agent/demo.sh new file mode 100755 index 0000000000..a2d7566118 --- /dev/null +++ b/examples/templates/procurement_approval_agent/demo.sh @@ -0,0 +1,232 @@ +#!/usr/bin/env bash +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../.." && pwd)" +cd "$REPO_ROOT" + +export HIVE_AGENT_STORAGE_ROOT="/tmp/hive_agents_demo_record" + +WATCH_API="/tmp/watched_requests_api" +WATCH_CSV="/tmp/watched_requests_csv" +LOG_API="/tmp/procurement_demo_api.log" +LOG_CSV="/tmp/procurement_demo_csv.log" +FIFO_API="/tmp/procurement_demo_api.fifo" +FIFO_CSV="/tmp/procurement_demo_csv.fifo" + +API_RESULT="$WATCH_API/results/request_api.result.json" +CSV_RESULT="$WATCH_CSV/results/request_csv.result.json" + +pause() { + local seconds="$1" + echo "[demo] sleeping ${seconds}s..." + sleep "$seconds" +} + +step() { + echo + echo "==================================================" + echo "[demo] $1" + echo "==================================================" +} + +start_monitor() { + local watch_dir="$1" + local log_file="$2" + local fifo_path="$3" + + rm -f "$fifo_path" + mkfifo "$fifo_path" + + uv run python -m examples.templates.procurement_approval_agent monitor \ + --watch-dir "$watch_dir" \ + --poll-interval 1.0 \ + --mock \ + --interactive \ + < "$fifo_path" \ + > "$log_file" 2>&1 & + + MON_PID=$! + echo "[demo] monitor pid: $MON_PID" +} + +feed_answers() { + local fifo_path="$1" + local process_answer="$2" + local has_qb_answer="$3" + local sync_answer="$4" + + ( + exec 3>"$fifo_path" + pause 3 + printf "%s\n" "$process_answer" >&3 + pause 2 + printf "%s\n" "$has_qb_answer" >&3 + pause 2 + printf "%s\n" "$sync_answer" >&3 + exec 3>&- + ) & + FEED_PID=$! +} + +wait_for_file() { + local file_path="$1" + local timeout_seconds="${2:-30}" + local elapsed=0 + + while [[ $elapsed -lt $timeout_seconds ]]; do + if [[ -f "$file_path" ]]; then + return 0 + fi + sleep 1 + elapsed=$((elapsed + 1)) + done + + return 1 +} + +stop_monitor() { + if [[ -n "${FEED_PID:-}" ]] && kill -0 "$FEED_PID" 2>/dev/null; then + wait "$FEED_PID" 2>/dev/null || true + fi + + if [[ -n "${MON_PID:-}" ]] && kill -0 "$MON_PID" 2>/dev/null; then + kill "$MON_PID" 2>/dev/null || true + wait "$MON_PID" 2>/dev/null || true + echo "[demo] monitor stopped (pid: $MON_PID)" + fi + + MON_PID="" + FEED_PID="" +} + +cleanup() { + stop_monitor + rm -f "$FIFO_API" "$FIFO_CSV" +} + +stop_stale_monitors() { + pkill -f "procurement_approval_agent monitor --watch-dir $WATCH_API" || true + pkill -f "procurement_approval_agent monitor --watch-dir $WATCH_CSV" || true +} + +trap cleanup EXIT + +step "Pre-flight checks" +which ollama +ollama list + +echo "[demo] checking model availability (expect llama3.2)" +if ! ollama list | grep -q "llama3.2"; then + echo "[demo] WARNING: llama3.2 not found in ollama list output" +fi +pause 2 + +step "Clear previous state" +stop_stale_monitors +rm -rf "$WATCH_API" "$WATCH_CSV" "$HIVE_AGENT_STORAGE_ROOT" +rm -f "$LOG_API" "$LOG_CSV" "$FIFO_API" "$FIFO_CSV" +mkdir -p "$WATCH_API" "$WATCH_CSV" + +rm -f examples/templates/procurement_approval_agent/data/qb_mock_responses.json +rm -f examples/templates/procurement_approval_agent/data/po/*_qb_manual_import.csv +rm -f examples/templates/procurement_approval_agent/data/po/*_qb_import_instructions.md +pause 2 + +step "Phase 1: API path request (interactive prompts)" +uv run python -m examples.templates.procurement_approval_agent reset-setup +export QUICKBOOKS_CLIENT_ID="demo-client-id" +export QUICKBOOKS_CLIENT_SECRET="demo-client-secret" +export QUICKBOOKS_REALM_ID="demo-realm-id" +export QUICKBOOKS_USE_MOCK="true" + +start_monitor "$WATCH_API" "$LOG_API" "$FIFO_API" +pause 3 + +cat > "$WATCH_API/request_api.json" <<'JSON' +{ + "item": "MacBook Pro 16", + "cost": 2899, + "department": "engineering", + "requester": "richard@company.com", + "justification": "Need high-memory build machine for release engineering and incident response.", + "vendor": "TechSource LLC" +} +JSON + +echo "[demo] dropped request: $WATCH_API/request_api.json" +feed_answers "$FIFO_API" "yes" "yes" "yes" + +if wait_for_file "$API_RESULT" 45; then + echo "[demo] API result file detected" +else + echo "[demo] ERROR: API result file not detected within timeout" +fi +pause 2 +stop_monitor +pause 2 + +step "Phase 2: CSV fallback request (interactive prompts)" +uv run python -m examples.templates.procurement_approval_agent reset-setup +unset QUICKBOOKS_CLIENT_ID QUICKBOOKS_CLIENT_SECRET QUICKBOOKS_REALM_ID QUICKBOOKS_USE_MOCK || true + +start_monitor "$WATCH_CSV" "$LOG_CSV" "$FIFO_CSV" +pause 3 + +cat > "$WATCH_CSV/request_csv.json" <<'JSON' +{ + "item": "Security License Renewal", + "cost": 1800, + "department": "operations", + "requester": "ops-lead@company.com", + "justification": "Renew endpoint security licenses to maintain compliance and coverage.", + "vendor": "Global Industrial" +} +JSON + +echo "[demo] dropped request: $WATCH_CSV/request_csv.json" +feed_answers "$FIFO_CSV" "yes" "no" "yes" + +if wait_for_file "$CSV_RESULT" 45; then + echo "[demo] CSV result file detected" +else + echo "[demo] ERROR: CSV result file not detected within timeout" +fi +pause 2 +stop_monitor +pause 2 + +step "Monitor console output (API phase)" +sed \ + -e '/paused_at=None is not a valid node, falling back to entry point/d' \ + -e '/end_run called but no run for execution/d' \ + "$LOG_API" +pause 2 + +step "Monitor console output (CSV phase)" +sed \ + -e '/paused_at=None is not a valid node, falling back to entry point/d' \ + -e '/end_run called but no run for execution/d' \ + "$LOG_CSV" +pause 2 + +step "Generated files" +echo "[demo] API watch files" +find "$WATCH_API" -maxdepth 3 -type f | sort + +echo "[demo] CSV watch files" +find "$WATCH_CSV" -maxdepth 3 -type f | sort + +echo "[demo] agent data artifacts" +find examples/templates/procurement_approval_agent/data -maxdepth 4 -type f | sort +pause 2 + +step "Result JSON (API path)" +cat "$API_RESULT" +pause 2 + +step "Result JSON (CSV path)" +cat "$CSV_RESULT" +pause 2 + +step "Demo complete" +echo "[demo] Finished. Ready for screen recording replay." diff --git a/examples/templates/procurement_approval_agent/demo_workflows.sh b/examples/templates/procurement_approval_agent/demo_workflows.sh new file mode 100755 index 0000000000..eabefc3744 --- /dev/null +++ b/examples/templates/procurement_approval_agent/demo_workflows.sh @@ -0,0 +1,206 @@ +#!/usr/bin/env bash +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../.." && pwd)" +cd "$REPO_ROOT" + +export HIVE_AGENT_STORAGE_ROOT="/tmp/hive_agents_demo_workflows" + +WATCH_API="/tmp/watched_requests_workflows_api" +WATCH_CSV="/tmp/watched_requests_workflows_csv" +LOG_API="/tmp/procurement_workflows_api.log" +LOG_CSV="/tmp/procurement_workflows_csv.log" +FIFO_API="/tmp/procurement_workflows_api.fifo" +FIFO_CSV="/tmp/procurement_workflows_csv.fifo" + +API_RESULT="$WATCH_API/results/request_api.result.json" +CSV_RESULT="$WATCH_CSV/results/request_csv.result.json" + +pause() { + local seconds="$1" + echo "[demo-workflows] sleeping ${seconds}s..." + sleep "$seconds" +} + +step() { + echo + echo "==================================================" + echo "[demo-workflows] $1" + echo "==================================================" +} + +start_monitor() { + local watch_dir="$1" + local log_file="$2" + local fifo_path="$3" + + rm -f "$fifo_path" + mkfifo "$fifo_path" + + uv run python -m examples.templates.procurement_approval_agent monitor \ + --watch-dir "$watch_dir" \ + --poll-interval 1.0 \ + --mock \ + --interactive \ + < "$fifo_path" \ + > "$log_file" 2>&1 & + + MON_PID=$! + echo "[demo-workflows] monitor pid: $MON_PID" +} + +feed_answers() { + local fifo_path="$1" + local process_answer="$2" + local has_qb_answer="$3" + local sync_answer="$4" + + ( + exec 3>"$fifo_path" + pause 3 + printf "%s\n" "$process_answer" >&3 + pause 2 + printf "%s\n" "$has_qb_answer" >&3 + pause 2 + printf "%s\n" "$sync_answer" >&3 + exec 3>&- + ) & + FEED_PID=$! +} + +wait_for_file() { + local file_path="$1" + local timeout_seconds="${2:-30}" + local elapsed=0 + + while [[ $elapsed -lt $timeout_seconds ]]; do + if [[ -f "$file_path" ]]; then + return 0 + fi + sleep 1 + elapsed=$((elapsed + 1)) + done + return 1 +} + +stop_monitor() { + if [[ -n "${FEED_PID:-}" ]] && kill -0 "$FEED_PID" 2>/dev/null; then + wait "$FEED_PID" 2>/dev/null || true + fi + + if [[ -n "${MON_PID:-}" ]] && kill -0 "$MON_PID" 2>/dev/null; then + kill "$MON_PID" 2>/dev/null || true + wait "$MON_PID" 2>/dev/null || true + echo "[demo-workflows] monitor stopped (pid: $MON_PID)" + fi + + MON_PID="" + FEED_PID="" +} + +cleanup() { + stop_monitor + rm -f "$FIFO_API" "$FIFO_CSV" +} + +stop_stale_monitors() { + pkill -f "procurement_approval_agent monitor --watch-dir $WATCH_API" || true + pkill -f "procurement_approval_agent monitor --watch-dir $WATCH_CSV" || true +} + +trap cleanup EXIT + +step "Reset state" +stop_stale_monitors +rm -rf "$WATCH_API" "$WATCH_CSV" "$HIVE_AGENT_STORAGE_ROOT" +rm -f "$LOG_API" "$LOG_CSV" "$FIFO_API" "$FIFO_CSV" +mkdir -p "$WATCH_API" "$WATCH_CSV" + +rm -f examples/templates/procurement_approval_agent/data/qb_mock_responses.json +rm -f examples/templates/procurement_approval_agent/data/po/*_qb_manual_import.csv +rm -f examples/templates/procurement_approval_agent/data/po/*_qb_import_instructions.md +pause 2 + +step "Path A: API route with interactive checkpoints" +uv run python -m examples.templates.procurement_approval_agent reset-setup +export QUICKBOOKS_CLIENT_ID="demo-client-id" +export QUICKBOOKS_CLIENT_SECRET="demo-client-secret" +export QUICKBOOKS_REALM_ID="demo-realm-id" +export QUICKBOOKS_USE_MOCK="true" + +start_monitor "$WATCH_API" "$LOG_API" "$FIFO_API" +pause 2 + +cat > "$WATCH_API/request_api.json" <<'JSON' +{ + "item": "Laptop", + "cost": 1200, + "department": "engineering", + "requester": "alice@company.com", + "justification": "Need new laptop for ML development work", + "vendor": "TechSource LLC" +} +JSON + +echo "[demo-workflows] dropped request: $WATCH_API/request_api.json" +feed_answers "$FIFO_API" "yes" "yes" "yes" + +if wait_for_file "$API_RESULT" 45; then + echo "[demo-workflows] API result file detected" +else + echo "[demo-workflows] ERROR: API result file not detected within timeout" +fi +pause 2 +stop_monitor + +step "Path B: CSV route with interactive checkpoints" +uv run python -m examples.templates.procurement_approval_agent reset-setup +unset QUICKBOOKS_CLIENT_ID QUICKBOOKS_CLIENT_SECRET QUICKBOOKS_REALM_ID QUICKBOOKS_USE_MOCK || true + +start_monitor "$WATCH_CSV" "$LOG_CSV" "$FIFO_CSV" +pause 2 + +cat > "$WATCH_CSV/request_csv.json" <<'JSON' +{ + "item": "Security License Renewal", + "cost": 1800, + "department": "operations", + "requester": "ops-lead@company.com", + "justification": "Renew endpoint security licenses to maintain compliance and coverage.", + "vendor": "Global Industrial" +} +JSON + +echo "[demo-workflows] dropped request: $WATCH_CSV/request_csv.json" +feed_answers "$FIFO_CSV" "yes" "no" "yes" + +if wait_for_file "$CSV_RESULT" 45; then + echo "[demo-workflows] CSV result file detected" +else + echo "[demo-workflows] ERROR: CSV result file not detected within timeout" +fi +pause 2 +stop_monitor + +step "Monitor output logs" +echo "[demo-workflows] API log" +sed \ + -e '/paused_at=None is not a valid node, falling back to entry point/d' \ + -e '/end_run called but no run for execution/d' \ + "$LOG_API" +echo "[demo-workflows] CSV log" +sed \ + -e '/paused_at=None is not a valid node, falling back to entry point/d' \ + -e '/end_run called but no run for execution/d' \ + "$LOG_CSV" + +step "Result payloads" +cat "$API_RESULT" +cat "$CSV_RESULT" + +step "Validation" +uv run python -m examples.templates.procurement_approval_agent validate +uv run python -m examples.templates.procurement_approval_agent info + +echo +echo "[demo-workflows] Demo complete." diff --git a/examples/templates/procurement_approval_agent/deploy/install_launchd.sh b/examples/templates/procurement_approval_agent/deploy/install_launchd.sh new file mode 100755 index 0000000000..fe603d525b --- /dev/null +++ b/examples/templates/procurement_approval_agent/deploy/install_launchd.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../../.." && pwd)" +cd "$REPO_ROOT" + +PLIST_DST="${PLIST_DST:-$HOME/Library/LaunchAgents/com.hive.procurement-approval-agent.plist}" +WATCH_DIR="${WATCH_DIR:-$HOME/procurement_approval_agent/watched_requests}" +LOG_FILE="${LOG_FILE:-/tmp/procurement_approval_agent_launchd.log}" +POLL_INTERVAL="${POLL_INTERVAL:-2.0}" + +mkdir -p "$HOME/Library/LaunchAgents" "$WATCH_DIR" + +uv run python -m examples.templates.procurement_approval_agent write-launchd \ + --destination "$PLIST_DST" \ + --watch-dir "$WATCH_DIR" \ + --poll-interval "$POLL_INTERVAL" \ + --log-file "$LOG_FILE" + +launchctl unload "$PLIST_DST" 2>/dev/null || true +launchctl load -w "$PLIST_DST" + +echo "Installed launchd service: $PLIST_DST" +echo "Watch directory: $WATCH_DIR" +echo "Log: $LOG_FILE" diff --git a/examples/templates/procurement_approval_agent/flowchart.json b/examples/templates/procurement_approval_agent/flowchart.json new file mode 100644 index 0000000000..8b4490f14f --- /dev/null +++ b/examples/templates/procurement_approval_agent/flowchart.json @@ -0,0 +1,553 @@ +{ + "original_draft": { + "agent_name": "procurement_approval_agent", + "goal": "Automate purchase request approval with adaptive QuickBooks sync routing.", + "description": "", + "success_criteria": [ + "Approved requests remain within budget", + "Manual threshold requests get manager review", + "Approved requests produce PO artifacts" + ], + "constraints": [ + "Requests must pass budget validation", + "Requests must pass vendor validation" + ], + "nodes": [ + { + "id": "setup-wizard", + "name": "Setup Wizard", + "description": "First-run onboarding for QuickBooks integration preference", + "node_type": "event_loop", + "tools": [], + "input_keys": [], + "output_keys": [ + "setup_completed", + "preferred_sync_method" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "start", + "flowchart_shape": "stadium", + "flowchart_color": "#8aad3f" + }, + { + "id": "pre-execution-check", + "name": "Execution Confirmation", + "description": "Confirm whether to process this request now", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "item", + "cost", + "department", + "requester" + ], + "output_keys": [ + "process_request" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "decision", + "flowchart_shape": "diamond", + "flowchart_color": "#d89d26" + }, + { + "id": "request-cancelled", + "name": "Request Cancelled", + "description": "Terminal node when user chooses not to process request", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "item", + "requester" + ], + "output_keys": [ + "request_cancelled" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "terminal", + "flowchart_shape": "stadium", + "flowchart_color": "#b5453a" + }, + { + "id": "intake", + "name": "Request Intake", + "description": "Validate and parse purchase request", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "item", + "cost", + "justification", + "requester", + "department", + "vendor" + ], + "output_keys": [ + "validated_request" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "process", + "flowchart_shape": "rectangle", + "flowchart_color": "#b5a575" + }, + { + "id": "budget-check", + "name": "Budget Validation", + "description": "Check if request fits within department budget", + "node_type": "event_loop", + "tools": [ + "load_data" + ], + "input_keys": [ + "validated_request" + ], + "output_keys": [ + "budget_status", + "remaining_budget" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "decision", + "flowchart_shape": "diamond", + "flowchart_color": "#d89d26" + }, + { + "id": "vendor-check", + "name": "Vendor Validation", + "description": "Check whether vendor is on approved list", + "node_type": "event_loop", + "tools": [ + "load_data" + ], + "input_keys": [ + "validated_request" + ], + "output_keys": [ + "vendor_approved" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "database", + "flowchart_shape": "cylinder", + "flowchart_color": "#508878" + }, + { + "id": "manager-approval", + "name": "Manager Approval", + "description": "Get manager approval for purchase request", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "validated_request", + "budget_status", + "remaining_budget" + ], + "output_keys": [ + "approval_decision", + "approver_name" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "decision", + "flowchart_shape": "diamond", + "flowchart_color": "#d89d26" + }, + { + "id": "po-generator", + "name": "PO Generator", + "description": "Generate purchase order documents", + "node_type": "event_loop", + "tools": [ + "save_data" + ], + "input_keys": [ + "validated_request" + ], + "output_keys": [ + "po_number", + "po_data", + "po_files_created" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "database", + "flowchart_shape": "cylinder", + "flowchart_color": "#508878" + }, + { + "id": "integration-setup-check", + "name": "Integration Setup Check", + "description": "Ask whether API credentials are available for this request", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "po_number" + ], + "output_keys": [ + "declared_qb_api_available", + "declared_sync_preference" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "process", + "flowchart_shape": "rectangle", + "flowchart_color": "#b5a575" + }, + { + "id": "integration-check", + "name": "Integration Capability Check", + "description": "Detect whether QuickBooks API credentials are available", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "po_number", + "po_data" + ], + "output_keys": [ + "has_qb_api", + "sync_method" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "process", + "flowchart_shape": "rectangle", + "flowchart_color": "#b5a575" + }, + { + "id": "pre-sync-confirmation", + "name": "Pre-Sync Confirmation", + "description": "Final yes/no confirmation before sync/export", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "sync_method", + "po_number" + ], + "output_keys": [ + "sync_confirmed" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "decision", + "flowchart_shape": "diamond", + "flowchart_color": "#d89d26" + }, + { + "id": "sync-cancelled", + "name": "Sync Cancelled", + "description": "Terminal node when user declines final sync/export", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "po_number", + "sync_method" + ], + "output_keys": [ + "sync_cancelled" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "terminal", + "flowchart_shape": "stadium", + "flowchart_color": "#b5453a" + }, + { + "id": "quickbooks-sync", + "name": "QuickBooks Sync", + "description": "Sync generated PO to QuickBooks API", + "node_type": "event_loop", + "tools": [], + "input_keys": [ + "po_number", + "po_data" + ], + "output_keys": [ + "qb_po_id", + "sync_status" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "process", + "flowchart_shape": "rectangle", + "flowchart_color": "#b5a575" + }, + { + "id": "csv-export", + "name": "QuickBooks CSV Export", + "description": "Generate CSV + instructions when API sync is unavailable", + "node_type": "event_loop", + "tools": [ + "save_data" + ], + "input_keys": [ + "po_number", + "po_data" + ], + "output_keys": [ + "csv_file_path", + "import_instructions" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "database", + "flowchart_shape": "cylinder", + "flowchart_color": "#508878" + }, + { + "id": "notifications", + "name": "Generate Notifications", + "description": "Create notification files for stakeholders", + "node_type": "event_loop", + "tools": [ + "save_data" + ], + "input_keys": [ + "validated_request", + "po_number", + "po_files_created", + "sync_method" + ], + "output_keys": [ + "notifications_created" + ], + "success_criteria": "", + "sub_agents": [], + "flowchart_type": "terminal", + "flowchart_shape": "stadium", + "flowchart_color": "#b5453a" + } + ], + "edges": [ + { + "id": "edge-0", + "source": "setup-wizard", + "target": "pre-execution-check", + "condition": "on_success", + "description": "", + "label": "" + }, + { + "id": "edge-1", + "source": "pre-execution-check", + "target": "intake", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-2", + "source": "pre-execution-check", + "target": "request-cancelled", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-3", + "source": "intake", + "target": "budget-check", + "condition": "on_success", + "description": "", + "label": "" + }, + { + "id": "edge-4", + "source": "budget-check", + "target": "manager-approval", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-5", + "source": "budget-check", + "target": "vendor-check", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-6", + "source": "manager-approval", + "target": "vendor-check", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-7", + "source": "manager-approval", + "target": "intake", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-8", + "source": "vendor-check", + "target": "po-generator", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-9", + "source": "po-generator", + "target": "integration-setup-check", + "condition": "on_success", + "description": "", + "label": "" + }, + { + "id": "edge-10", + "source": "integration-setup-check", + "target": "integration-check", + "condition": "on_success", + "description": "", + "label": "" + }, + { + "id": "edge-11", + "source": "integration-check", + "target": "pre-sync-confirmation", + "condition": "on_success", + "description": "", + "label": "" + }, + { + "id": "edge-12", + "source": "pre-sync-confirmation", + "target": "quickbooks-sync", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-13", + "source": "pre-sync-confirmation", + "target": "csv-export", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-14", + "source": "pre-sync-confirmation", + "target": "sync-cancelled", + "condition": "conditional", + "description": "", + "label": "" + }, + { + "id": "edge-15", + "source": "quickbooks-sync", + "target": "notifications", + "condition": "on_success", + "description": "", + "label": "" + }, + { + "id": "edge-16", + "source": "csv-export", + "target": "notifications", + "condition": "on_success", + "description": "", + "label": "" + } + ], + "entry_node": "setup-wizard", + "terminal_nodes": [ + "notifications", + "request-cancelled", + "sync-cancelled" + ], + "flowchart_legend": { + "start": { + "shape": "stadium", + "color": "#8aad3f" + }, + "terminal": { + "shape": "stadium", + "color": "#b5453a" + }, + "process": { + "shape": "rectangle", + "color": "#b5a575" + }, + "decision": { + "shape": "diamond", + "color": "#d89d26" + }, + "io": { + "shape": "parallelogram", + "color": "#d06818" + }, + "document": { + "shape": "document", + "color": "#c4b830" + }, + "database": { + "shape": "cylinder", + "color": "#508878" + }, + "subprocess": { + "shape": "subroutine", + "color": "#887a48" + }, + "browser": { + "shape": "hexagon", + "color": "#cc8850" + } + } + }, + "flowchart_map": { + "setup-wizard": [ + "setup-wizard" + ], + "pre-execution-check": [ + "pre-execution-check" + ], + "request-cancelled": [ + "request-cancelled" + ], + "intake": [ + "intake" + ], + "budget-check": [ + "budget-check" + ], + "vendor-check": [ + "vendor-check" + ], + "manager-approval": [ + "manager-approval" + ], + "po-generator": [ + "po-generator" + ], + "integration-setup-check": [ + "integration-setup-check" + ], + "integration-check": [ + "integration-check" + ], + "pre-sync-confirmation": [ + "pre-sync-confirmation" + ], + "sync-cancelled": [ + "sync-cancelled" + ], + "quickbooks-sync": [ + "quickbooks-sync" + ], + "csv-export": [ + "csv-export" + ], + "notifications": [ + "notifications" + ] + } +} diff --git a/examples/templates/procurement_approval_agent/mcp_servers.json b/examples/templates/procurement_approval_agent/mcp_servers.json new file mode 100644 index 0000000000..3c7ae27059 --- /dev/null +++ b/examples/templates/procurement_approval_agent/mcp_servers.json @@ -0,0 +1,9 @@ +{ + "hive-tools": { + "transport": "stdio", + "command": "uv", + "args": ["run", "python", "mcp_server.py", "--stdio"], + "cwd": "../../../tools", + "description": "Hive tools MCP server providing save_data/load_data and related utilities" + } +} diff --git a/examples/templates/procurement_approval_agent/monitor.py b/examples/templates/procurement_approval_agent/monitor.py new file mode 100644 index 0000000000..c7722771a1 --- /dev/null +++ b/examples/templates/procurement_approval_agent/monitor.py @@ -0,0 +1,509 @@ +"""Continuous monitoring utilities for Procurement Approval Agent.""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import os +import shutil +import subprocess +import sys +import time +import urllib.request +from dataclasses import dataclass +from email.message import EmailMessage +from pathlib import Path +from typing import Any +import smtplib + +from .agent import default_agent +from .nodes.quickbooks import has_quickbooks_api_credentials + + +REQUIRED_FIELDS = { + "item", + "cost", + "department", + "requester", + "justification", +} + + +@dataclass +class MonitorResult: + source_file: Path + success: bool + output_file: Path + archive_file: Path + output: dict[str, Any] + + +class RequestMonitor: + """Poll a folder and process each request JSON through the workflow.""" + + def __init__( + self, + watch_dir: Path, + poll_interval: float = 2.0, + mock_mode: bool = False, + mock_qb: bool = True, + auto_open_csv: bool = False, + notify: bool = True, + force: bool = False, + interactive: bool = False, + default_process_request: bool = True, + default_sync_confirmed: bool = True, + sync_method: str = "auto", + qb_available: str = "auto", + qb_credential_ref: str | None = None, + ) -> None: + self.watch_dir = watch_dir + self.poll_interval = poll_interval + self.mock_mode = mock_mode + self.mock_qb = mock_qb + self.auto_open_csv = auto_open_csv + self.notify = notify + self.force = force + self.interactive = interactive + self.default_process_request = default_process_request + self.default_sync_confirmed = default_sync_confirmed + self.sync_method = sync_method + self.qb_available = qb_available + self.qb_credential_ref = qb_credential_ref + + self.processing_dir = self.watch_dir / "processing" + self.done_dir = self.watch_dir / "done" + self.failed_dir = self.watch_dir / "failed" + self.results_dir = self.watch_dir / "results" + self.history_file = self.watch_dir / "history.json" + + for d in [ + self.watch_dir, + self.processing_dir, + self.done_dir, + self.failed_dir, + self.results_dir, + ]: + d.mkdir(parents=True, exist_ok=True) + + def _load_request(self, path: Path) -> dict[str, Any]: + raw = json.loads(path.read_text(encoding="utf-8")) + missing = REQUIRED_FIELDS - set(raw.keys()) + if missing: + raise ValueError(f"Missing required fields: {sorted(missing)}") + if "vendor" not in raw or not raw["vendor"]: + raw["vendor"] = "Unknown" + return raw + + async def process_file(self, path: Path) -> MonitorResult: + processing_file = self.processing_dir / path.name + shutil.move(str(path), str(processing_file)) + + try: + request_data = self._load_request(processing_file) + request_hash = self._request_hash(request_data) + if not self.force and self._is_duplicate_recent(request_hash): + warning = ( + "Duplicate request detected in last 24h; skipping. " + "Use --force to override." + ) + output = { + "success": False, + "error": warning, + "steps_executed": 0, + "output": {}, + } + output_file = self.results_dir / f"{processing_file.stem}.result.json" + output_file.write_text( + json.dumps(output, indent=2, default=str), encoding="utf-8" + ) + archive_file = self.failed_dir / processing_file.name + shutil.move(str(processing_file), str(archive_file)) + print(f"[monitor] WARNING duplicate skipped: {processing_file.name}") + return MonitorResult( + source_file=path, + success=False, + output_file=output_file, + archive_file=archive_file, + output=output, + ) + + runtime_context = dict(request_data) + runtime_context.update(self._runtime_controls(processing_file.name)) + if self.qb_credential_ref: + runtime_context["qb_credential_ref"] = self.qb_credential_ref + + result = await default_agent.run( + runtime_context, + mock_mode=self.mock_mode, + mock_qb=self.mock_qb, + ) + + output = { + "success": result.success, + "error": result.error, + "steps_executed": result.steps_executed, + "output": result.output or {}, + } + output_file = self.results_dir / f"{processing_file.stem}.result.json" + output_file.write_text( + json.dumps(output, indent=2, default=str), encoding="utf-8" + ) + + archive_base = self.done_dir if result.success else self.failed_dir + archive_file = archive_base / processing_file.name + shutil.move(str(processing_file), str(archive_file)) + if result.success: + self._record_request_hash(request_hash) + + if result.success: + self._auto_post_actions(result.output or {}) + if self.notify: + self._send_notifications(processing_file.name, output) + + return MonitorResult( + source_file=path, + success=bool(result.success), + output_file=output_file, + archive_file=archive_file, + output=output, + ) + except Exception as exc: + output = { + "success": False, + "error": str(exc), + "steps_executed": 0, + "output": {}, + } + output_file = self.results_dir / f"{processing_file.stem}.result.json" + output_file.write_text( + json.dumps(output, indent=2, default=str), encoding="utf-8" + ) + archive_file = self.failed_dir / processing_file.name + if processing_file.exists(): + shutil.move(str(processing_file), str(archive_file)) + if self.notify: + self._send_notifications(processing_file.name, output) + return MonitorResult( + source_file=path, + success=False, + output_file=output_file, + archive_file=archive_file, + output=output, + ) + + def _prompt_yes_no(self, question: str, default: bool) -> bool: + suffix = "Y/n" if default else "y/N" + while True: + response = input(f"{question} [{suffix}]: ").strip().lower() + if response == "": + return default + if response in {"y", "yes"}: + return True + if response in {"n", "no"}: + return False + print("Please answer yes or no.") + + def _runtime_controls(self, request_name: str) -> dict[str, Any]: + if self.interactive: + print(f"[monitor] interactive checkpoint for {request_name}") + process_request = self._prompt_yes_no( + "Process this purchase request now?", + default=self.default_process_request, + ) + controls: dict[str, Any] = {"process_request": process_request} + if not process_request: + controls["sync_confirmed"] = False + return controls + + has_qb = self._prompt_yes_no( + "Do you have QuickBooks API credentials configured for this run?", + default=has_quickbooks_api_credentials( + credential_ref=self.qb_credential_ref + ), + ) + controls["declared_qb_api_available"] = has_qb + controls["declared_sync_preference"] = "api" if has_qb else "csv" + controls["sync_confirmed"] = self._prompt_yes_no( + "Proceed with final sync/export step after PO generation?", + default=self.default_sync_confirmed, + ) + return controls + + controls = { + "process_request": self.default_process_request, + "sync_confirmed": self.default_sync_confirmed, + } + if self.sync_method in {"api", "csv"}: + controls["declared_sync_preference"] = self.sync_method + controls["declared_qb_api_available"] = self.sync_method == "api" + return controls + if self.qb_available in {"yes", "no"}: + has_qb = self.qb_available == "yes" + controls["declared_qb_api_available"] = has_qb + controls["declared_sync_preference"] = "api" if has_qb else "csv" + return controls + + has_qb = has_quickbooks_api_credentials(credential_ref=self.qb_credential_ref) + controls["declared_qb_api_available"] = has_qb + controls["declared_sync_preference"] = "api" if has_qb else "csv" + return controls + + def _request_hash(self, request_data: dict[str, Any]) -> str: + key = "|".join( + [ + str(request_data.get("item", "")).strip().lower(), + str(request_data.get("cost", "")).strip(), + str(request_data.get("department", "")).strip().lower(), + str(request_data.get("requester", "")).strip().lower(), + ] + ) + return hashlib.sha256(key.encode("utf-8")).hexdigest() + + def _load_history(self) -> list[dict[str, Any]]: + if not self.history_file.exists(): + return [] + try: + data = json.loads(self.history_file.read_text(encoding="utf-8")) + return data if isinstance(data, list) else [] + except (json.JSONDecodeError, OSError): + return [] + + def _save_history(self, rows: list[dict[str, Any]]) -> None: + self.history_file.write_text(json.dumps(rows, indent=2), encoding="utf-8") + + def _is_duplicate_recent(self, request_hash: str) -> bool: + cutoff = time.time() - 24 * 60 * 60 + history = self._load_history() + recent = [row for row in history if float(row.get("ts", 0)) >= cutoff] + self._save_history(recent) + return any(row.get("hash") == request_hash for row in recent) + + def _record_request_hash(self, request_hash: str) -> None: + history = self._load_history() + cutoff = time.time() - 24 * 60 * 60 + history = [row for row in history if float(row.get("ts", 0)) >= cutoff] + history.append({"hash": request_hash, "ts": time.time()}) + self._save_history(history) + + async def process_once(self) -> list[MonitorResult]: + candidates = sorted( + [ + p + for p in self.watch_dir.glob("*.json") + if p.is_file() and p.name != self.history_file.name + ], + key=lambda p: p.stat().st_mtime, + ) + results: list[MonitorResult] = [] + for path in candidates: + result = await self.process_file(path) + status = "SUCCESS" if result.success else "FAILED" + print(f"[monitor] {status} {path.name} -> {result.output_file}") + results.append(result) + return results + + async def run_forever(self) -> None: + while True: + await self.process_once() + await asyncio.sleep(self.poll_interval) + + def _auto_post_actions(self, workflow_output: dict[str, Any]) -> None: + sync_method = workflow_output.get("sync_method") + if sync_method == "csv" and self.auto_open_csv: + csv_rel = workflow_output.get("csv_file_path") + if isinstance(csv_rel, str) and csv_rel: + self._reveal_file(csv_rel) + + def _reveal_file(self, rel_path: str) -> None: + full_path = Path(__file__).resolve().parents[0] / rel_path + if not full_path.exists(): + return + try: + if sys.platform == "darwin": + subprocess.run(["open", "-R", str(full_path)], check=False) + elif sys.platform.startswith("linux"): + subprocess.run(["xdg-open", str(full_path.parent)], check=False) + except Exception: + pass + + def _send_notifications(self, request_name: str, payload: dict[str, Any]) -> None: + status = "SUCCESS" if payload.get("success") else "FAILED" + subject = f"[Procurement Agent] {status}: {request_name}" + text = json.dumps(payload, indent=2, default=str) + + self._notify_slack(subject, payload) + self._notify_email(subject, text) + + def _notify_slack(self, title: str, payload: dict[str, Any]) -> None: + webhook = os.environ.get("SLACK_WEBHOOK_URL") + if not webhook: + return + body = { + "text": f"{title}\n```{json.dumps(payload, default=str)[:2800]}```", + } + req = urllib.request.Request( + webhook, + data=json.dumps(body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + urllib.request.urlopen(req, timeout=5).read() + except Exception: + pass + + def _notify_email(self, subject: str, body: str) -> None: + host = os.environ.get("SMTP_HOST") + port = os.environ.get("SMTP_PORT") + username = os.environ.get("SMTP_USERNAME") + password = os.environ.get("SMTP_PASSWORD") + sender = os.environ.get("SMTP_FROM") + recipient = os.environ.get("SMTP_TO") + + if not all([host, port, sender, recipient]): + return + + msg = EmailMessage() + msg["Subject"] = subject + msg["From"] = sender + msg["To"] = recipient + msg.set_content(body) + + try: + with smtplib.SMTP(host, int(port), timeout=10) as smtp: + if username and password: + smtp.starttls() + smtp.login(username, password) + smtp.send_message(msg) + except Exception: + pass + + +def spawn_daemon( + watch_dir: Path, + poll_interval: float, + mock_mode: bool, + mock_qb: bool, + auto_open_csv: bool, + notify: bool, + force: bool, + default_process_request: bool, + default_sync_confirmed: bool, + sync_method: str, + qb_available: str, + qb_credential_ref: str | None, + log_file: Path, +) -> int: + """Spawn detached monitor subprocess and return its PID.""" + cmd = [ + sys.executable, + "-m", + "procurement_approval_agent", + "monitor", + "--watch-dir", + str(watch_dir), + "--poll-interval", + str(poll_interval), + "--no-daemon", + ] + if mock_mode: + cmd.append("--mock") + if not mock_qb: + cmd.append("--no-mock-qb") + if auto_open_csv: + cmd.append("--auto-open-csv") + if not notify: + cmd.append("--no-notify") + if force: + cmd.append("--force") + if not default_process_request: + cmd.append("--skip-process") + if not default_sync_confirmed: + cmd.append("--sync-cancel") + if sync_method in {"api", "csv"}: + cmd.extend(["--sync-method", sync_method]) + if qb_available in {"yes", "no"}: + cmd.extend(["--qb-available", qb_available]) + if qb_credential_ref: + cmd.extend(["--qb-credential-ref", qb_credential_ref]) + + log_file.parent.mkdir(parents=True, exist_ok=True) + with open(log_file, "a", encoding="utf-8") as log: + proc = subprocess.Popen( + cmd, + stdout=log, + stderr=log, + start_new_session=True, + cwd=str(Path.cwd()), + env=os.environ.copy(), + ) + return proc.pid + + +def launchd_plist_content( + label: str, + working_dir: Path, + watch_dir: Path, + log_file: Path, + poll_interval: float, +) -> str: + """Generate launchd plist content for macOS background service.""" + return f""" + + + + Label + {label} + ProgramArguments + + {sys.executable} + -m + procurement_approval_agent + monitor + --watch-dir + {watch_dir} + --poll-interval + {poll_interval} + + WorkingDirectory + {working_dir} + EnvironmentVariables + + PYTHONPATH + core:examples/templates + + RunAtLoad + + KeepAlive + + StandardOutPath + {log_file} + StandardErrorPath + {log_file} + + +""" + + +def write_launchd_plist( + destination: Path, + label: str, + working_dir: Path, + watch_dir: Path, + log_file: Path, + poll_interval: float, +) -> Path: + destination.parent.mkdir(parents=True, exist_ok=True) + destination.write_text( + launchd_plist_content( + label=label, + working_dir=working_dir, + watch_dir=watch_dir, + log_file=log_file, + poll_interval=poll_interval, + ), + encoding="utf-8", + ) + return destination diff --git a/examples/templates/procurement_approval_agent/nodes/__init__.py b/examples/templates/procurement_approval_agent/nodes/__init__.py new file mode 100644 index 0000000000..955755f19a --- /dev/null +++ b/examples/templates/procurement_approval_agent/nodes/__init__.py @@ -0,0 +1,403 @@ +"""Node definitions for Procurement Approval Agent.""" + +from framework.graph import NodeSpec + +# Node 0: First-run setup wizard +setup_wizard_node = NodeSpec( + id="setup-wizard", + name="Setup Wizard", + description="First-run onboarding for QuickBooks integration preference", + node_type="event_loop", + client_facing=True, + max_node_visits=1, + input_keys=[], + output_keys=["setup_completed", "preferred_sync_method"], + system_prompt="""\ +You are the onboarding setup wizard for Procurement Approval Agent. + +Only run this during first execution. + +Ask the user: +"Do you have QuickBooks API credentials configured? (yes/no)" + +If yes: +- explain API sync mode and that credentials should be configured +- set_output("preferred_sync_method", "api") + +If no: +- explain CSV fallback workflow for manual import +- set_output("preferred_sync_method", "csv") + +Then always: +- set_output("setup_completed", true) +""", + tools=[], +) + +# Node 1: Intake - Receives purchase request +pre_execution_check_node = NodeSpec( + id="pre-execution-check", + name="Execution Confirmation", + description="Confirm whether to process this request now", + node_type="event_loop", + client_facing=True, + max_node_visits=1, + input_keys=["item", "cost", "department", "requester"], + output_keys=["process_request"], + system_prompt="""\ +You are an execution gate for procurement processing. + +Ask: +"Do you want to process this purchase request now? (yes/no)" + +If yes: +- set_output("process_request", true) +If no: +- set_output("process_request", false) +""", + tools=[], +) + +request_cancelled_node = NodeSpec( + id="request-cancelled", + name="Request Cancelled", + description="Terminal node when user chooses not to process request", + node_type="event_loop", + client_facing=False, + max_node_visits=1, + input_keys=["item", "requester"], + output_keys=["request_cancelled"], + system_prompt="""\ +You are a cancellation handler. + +The user chose not to process this request now. +Call: +- set_output("request_cancelled", true) +""", + tools=[], +) + + +intake_node = NodeSpec( + id="intake", + name="Request Intake", + description="Validate and parse purchase request", + node_type="event_loop", + client_facing=False, + input_keys=["item", "cost", "justification", "requester", "department", "vendor"], + output_keys=["validated_request"], + system_prompt="""\ +You are a purchase request validator. + +Your task: +1. Check all required fields are present: item, cost, justification, requester, department. +2. Validate cost is a positive number. +3. Ensure justification is at least 10 words. +4. Normalize vendor to "Unknown" when not provided. + +If validation passes: +- call set_output("validated_request", {...}) with item, cost, justification, requester, department, vendor + +If validation fails: +- explain what failed in plain language +- do not set outputs +""", + tools=[], +) + +# Node 2: Budget Check +budget_check_node = NodeSpec( + id="budget-check", + name="Budget Validation", + description="Check if request fits within department budget", + node_type="event_loop", + client_facing=False, + input_keys=["validated_request"], + output_keys=["budget_status", "remaining_budget"], + system_prompt="""\ +You are a budget validator. + +Task: +1. Use load_data to read `data/budget_tracking.db`. +2. Compute remaining_budget = allocated - spent for the request department. +3. Determine budget_status: + - cost <= remaining_budget * 0.9 => auto_approved + - cost <= remaining_budget => needs_approval + - cost > remaining_budget => denied + +Then call: +- set_output("budget_status", "auto_approved" | "needs_approval" | "denied") +- set_output("remaining_budget", ) + +If department is missing in budget data: +- set_output("budget_status", "denied") +- set_output("remaining_budget", 0) +""", + tools=["load_data"], +) + +# Node 3: Manager Approval - client-facing node +approval_node = NodeSpec( + id="manager-approval", + name="Manager Approval", + description="Get manager approval for purchase request", + node_type="event_loop", + client_facing=True, + input_keys=["validated_request", "budget_status", "remaining_budget"], + output_keys=["approval_decision", "approver_name"], + nullable_output_keys=["approver_name"], + max_node_visits=1, + system_prompt="""\ +You are a procurement approval interface. + +STEP 1 - Present the request details clearly to the manager and ask: +"Do you approve this request? (yes/no)" + +Include: +- Item +- Cost +- Department +- Requester +- Justification +- Budget impact (cost vs remaining budget) + +STEP 2 - After manager response, set outputs: +- approved => set_output("approval_decision", "approved") and set_output("approver_name", "Manager") +- rejected => set_output("approval_decision", "rejected") +""", + tools=[], +) + +# Node 4: Vendor Check +vendor_check_node = NodeSpec( + id="vendor-check", + name="Vendor Validation", + description="Check whether vendor is on approved list", + node_type="event_loop", + client_facing=False, + input_keys=["validated_request"], + output_keys=["vendor_approved"], + system_prompt="""\ +Check whether validated_request.vendor appears in `data/approved_vendors.csv`. + +Use load_data to read the CSV and set: +- set_output("vendor_approved", true/false) + +Comparison should be case-insensitive. +""", + tools=["load_data"], +) + +# Node 5: Generate PO +po_generator_node = NodeSpec( + id="po-generator", + name="PO Generator", + description="Generate purchase order documents", + node_type="event_loop", + client_facing=False, + input_keys=["validated_request"], + output_keys=["po_number", "po_data", "po_files_created"], + system_prompt="""\ +Generate a Purchase Order for the approved request. + +1. Build PO number in format: PO-YYYYMMDD-XXX. +2. Create structured PO JSON. +3. Create human-readable text summary. +4. Create a QuickBooks-compatible CSV. + +Use save_data to create files under data/po/: +- .json +- .txt +- _qb_import.csv + +Then call: +- set_output("po_number", ) +- set_output("po_data", ) +- set_output("po_files_created", [, , ]) +""", + tools=["save_data"], +) + +# Node 6: Integration Check +integration_setup_check_node = NodeSpec( + id="integration-setup-check", + name="Integration Setup Check", + description="Ask whether API credentials are available for this request", + node_type="event_loop", + client_facing=True, + max_node_visits=1, + input_keys=["po_number"], + output_keys=["declared_qb_api_available", "declared_sync_preference"], + system_prompt="""\ +You are an integration setup checkpoint. + +Ask: +"Do you have QuickBooks API credentials configured for this run? (yes/no)" + +If yes: +- set_output("declared_qb_api_available", true) +- set_output("declared_sync_preference", "api") +If no: +- set_output("declared_qb_api_available", false) +- set_output("declared_sync_preference", "csv") +""", + tools=[], +) + + +integration_check_node = NodeSpec( + id="integration-check", + name="Integration Capability Check", + description="Detect whether QuickBooks API credentials are available", + node_type="event_loop", + client_facing=False, + input_keys=["po_number", "po_data"], + output_keys=["has_qb_api", "sync_method"], + system_prompt="""\ +Determine whether QuickBooks API credentials are available. + +Set: +- has_qb_api: true/false +- sync_method: "api" if credentials exist, otherwise "csv" + +Always call: +- set_output("has_qb_api", ) +- set_output("sync_method", "api" | "csv") +""", + tools=[], +) + +pre_sync_confirmation_node = NodeSpec( + id="pre-sync-confirmation", + name="Pre-Sync Confirmation", + description="Final yes/no confirmation before sync/export", + node_type="event_loop", + client_facing=True, + max_node_visits=1, + input_keys=["sync_method", "po_number"], + output_keys=["sync_confirmed"], + system_prompt="""\ +You are a final sync gate. + +Ask: +"Proceed with {{sync_method}} step for purchase order {{po_number}}? (yes/no)" + +If yes: +- set_output("sync_confirmed", true) +If no: +- set_output("sync_confirmed", false) +""", + tools=[], +) + +sync_cancelled_node = NodeSpec( + id="sync-cancelled", + name="Sync Cancelled", + description="Terminal node when user declines final sync/export", + node_type="event_loop", + client_facing=False, + max_node_visits=1, + input_keys=["po_number", "sync_method"], + output_keys=["sync_cancelled"], + system_prompt="""\ +You are a sync cancellation handler. + +The user declined the final sync/export step. +Call: +- set_output("sync_cancelled", true) +""", + tools=[], +) + +# Node 7: QuickBooks Sync +quickbooks_sync_node = NodeSpec( + id="quickbooks-sync", + name="QuickBooks Sync", + description="Sync generated PO to QuickBooks API", + node_type="event_loop", + client_facing=False, + input_keys=["po_number", "po_data"], + output_keys=["qb_po_id", "sync_status"], + system_prompt="""\ +Sync the PO to QuickBooks. + +Inputs: +- po_number +- po_data + +For mock testing mode, simulate the API call and return: +- qb_po_id (fake QuickBooks PO ID) +- sync_status = "mock_synced" + +For real mode (future): +- use QuickBooks credentials and API endpoints +- return actual qb_po_id and status + +Always call: +- set_output("qb_po_id", ) +- set_output("sync_status", ) +""", + tools=[], +) + +# Node 8: CSV Export (fallback path) +csv_export_node = NodeSpec( + id="csv-export", + name="QuickBooks CSV Export", + description="Generate CSV + instructions when API sync is unavailable", + node_type="event_loop", + client_facing=False, + input_keys=["po_number", "po_data"], + output_keys=["csv_file_path", "import_instructions"], + system_prompt="""\ +Generate fallback QuickBooks import artifacts for manual upload. + +Create: +1. A QuickBooks-compatible CSV file +2. A markdown instructions file describing import steps + +Then call: +- set_output("csv_file_path", ) +- set_output("import_instructions", ) +""", + tools=["save_data"], +) + +# Node 9: Notification Generator +notification_node = NodeSpec( + id="notifications", + name="Generate Notifications", + description="Create notification files for stakeholders", + node_type="event_loop", + client_facing=False, + input_keys=["validated_request", "po_number", "po_files_created", "sync_method"], + output_keys=["notifications_created"], + system_prompt="""\ +Create notification markdown files under data/notifications/: +1. notification_requester_.md +2. notification_finance_.md +3. notification_manager_.md + +Use save_data to write these files and then: +- set_output("notifications_created", [file1, file2, file3]) +""", + tools=["save_data"], +) + +__all__ = [ + "setup_wizard_node", + "pre_execution_check_node", + "request_cancelled_node", + "intake_node", + "budget_check_node", + "approval_node", + "vendor_check_node", + "po_generator_node", + "integration_setup_check_node", + "integration_check_node", + "pre_sync_confirmation_node", + "sync_cancelled_node", + "quickbooks_sync_node", + "csv_export_node", + "notification_node", +] diff --git a/examples/templates/procurement_approval_agent/nodes/approval.py b/examples/templates/procurement_approval_agent/nodes/approval.py new file mode 100644 index 0000000000..10abfe9b5c --- /dev/null +++ b/examples/templates/procurement_approval_agent/nodes/approval.py @@ -0,0 +1 @@ +"""Helpers for manager approval handling.""" diff --git a/examples/templates/procurement_approval_agent/nodes/budget_checker.py b/examples/templates/procurement_approval_agent/nodes/budget_checker.py new file mode 100644 index 0000000000..fd7dee6b96 --- /dev/null +++ b/examples/templates/procurement_approval_agent/nodes/budget_checker.py @@ -0,0 +1 @@ +"""Helpers for budget checks.""" diff --git a/examples/templates/procurement_approval_agent/nodes/intake.py b/examples/templates/procurement_approval_agent/nodes/intake.py new file mode 100644 index 0000000000..67be7708c4 --- /dev/null +++ b/examples/templates/procurement_approval_agent/nodes/intake.py @@ -0,0 +1 @@ +"""Helpers for request intake and normalization.""" diff --git a/examples/templates/procurement_approval_agent/nodes/po_generator.py b/examples/templates/procurement_approval_agent/nodes/po_generator.py new file mode 100644 index 0000000000..4c75d5a807 --- /dev/null +++ b/examples/templates/procurement_approval_agent/nodes/po_generator.py @@ -0,0 +1 @@ +"""Helpers for PO generation formatting.""" diff --git a/examples/templates/procurement_approval_agent/nodes/quickbooks.py b/examples/templates/procurement_approval_agent/nodes/quickbooks.py new file mode 100644 index 0000000000..e166e5a4d1 --- /dev/null +++ b/examples/templates/procurement_approval_agent/nodes/quickbooks.py @@ -0,0 +1,101 @@ +"""QuickBooks sync helpers for procurement approval workflow.""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from ..credentials import resolve_quickbooks_credentials + + +def has_quickbooks_api_credentials(credential_ref: str | None = None) -> bool: + """Detect QuickBooks credential availability (env or Hive name/alias credential).""" + creds = resolve_quickbooks_credentials(credential_ref=credential_ref) + return creds.has_minimum + + +def mock_quickbooks_api( + po_number: str, + po_data: dict[str, Any] | None, + output_path: Path | None = None, +) -> dict[str, Any]: + """Simulate a QuickBooks PO sync and persist a mock response log.""" + qb_po_id = f"QB-{po_number}" + payload = { + "po_number": po_number, + "po_data": po_data or {}, + } + response = { + "qb_po_id": qb_po_id, + "sync_status": "mock_synced", + "api": "quickbooks.purchase_orders.create", + } + + record = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "request": payload, + "response": response, + } + + if output_path is None: + output_path = ( + Path(__file__).resolve().parents[1] / "data" / "qb_mock_responses.json" + ) + + output_path.parent.mkdir(parents=True, exist_ok=True) + + existing: list[dict[str, Any]] = [] + if output_path.exists(): + try: + existing_raw = json.loads(output_path.read_text(encoding="utf-8")) + if isinstance(existing_raw, list): + existing = existing_raw + except json.JSONDecodeError: + existing = [] + + existing.append(record) + output_path.write_text(json.dumps(existing, indent=2), encoding="utf-8") + + print(f"[mock-qb] would sync PO {po_number} to QuickBooks API") + return response + + +def mock_csv_export( + po_number: str, + po_data: dict[str, Any] | None, + output_dir: Path | None = None, +) -> dict[str, str]: + """Generate fallback CSV and manual import instructions.""" + if output_dir is None: + output_dir = Path(__file__).resolve().parents[1] / "data" / "po" + + output_dir.mkdir(parents=True, exist_ok=True) + + csv_path = output_dir / f"{po_number}_qb_manual_import.csv" + md_path = output_dir / f"{po_number}_qb_import_instructions.md" + + vendor = (po_data or {}).get("vendor", "Unknown") + amount = (po_data or {}).get("amount", 0) + currency = (po_data or {}).get("currency", "USD") + + csv_path.write_text( + f"DocNumber,Vendor,Amount,Currency\n{po_number},{vendor},{amount},{currency}\n", + encoding="utf-8", + ) + + md_path.write_text( + "# QuickBooks Manual Import Instructions\n\n" + f"1. Open QuickBooks and go to import purchase orders.\n" + f"2. Upload `{csv_path.name}`.\n" + "3. Map columns: DocNumber, Vendor, Amount, Currency.\n" + "4. Validate preview and submit import.\n", + encoding="utf-8", + ) + + print(f"[mock-qb-csv] generated fallback CSV for PO {po_number}") + return { + "csv_file_path": str(Path("data") / "po" / csv_path.name), + "import_instructions": str(Path("data") / "po" / md_path.name), + } diff --git a/examples/templates/procurement_approval_agent/quickbooks_api.py b/examples/templates/procurement_approval_agent/quickbooks_api.py new file mode 100644 index 0000000000..07bed6ae60 --- /dev/null +++ b/examples/templates/procurement_approval_agent/quickbooks_api.py @@ -0,0 +1,287 @@ +"""Basic QuickBooks API client with OAuth refresh and retry support.""" + +from __future__ import annotations + +import base64 +import json +import os +import time +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from .credentials import resolve_quickbooks_credentials + + +class QuickBooksAPIError(RuntimeError): + """Raised when QuickBooks API operations fail.""" + + +@dataclass +class QuickBooksConfig: + client_id: str + client_secret: str + realm_id: str + refresh_token: str + environment: str = "sandbox" + + +class QuickBooksAPI: + """Minimal QuickBooks API client for PurchaseOrder create flow.""" + + def __init__( + self, + config: QuickBooksConfig, + token_cache_path: Path | None = None, + ) -> None: + self.config = config + self.token_cache_path = token_cache_path + self._access_token: str | None = None + self._token_expires_at: float = 0.0 + self._load_token_cache() + + @staticmethod + def from_env( + token_cache_path: Path | None = None, + credential_ref: str | None = None, + ) -> "QuickBooksAPI": + creds = resolve_quickbooks_credentials(credential_ref=credential_ref) + + missing = [ + key + for key, value in { + "client_id": creds.client_id, + "client_secret": creds.client_secret, + "realm_id": creds.realm_id, + "refresh_token": creds.refresh_token, + }.items() + if not value + ] + if missing: + raise QuickBooksAPIError( + "Missing QuickBooks values for real API mode " + f"(source={creds.source}): {', '.join(missing)}" + ) + + return QuickBooksAPI( + config=QuickBooksConfig( + client_id=str(creds.client_id), + client_secret=str(creds.client_secret), + realm_id=str(creds.realm_id), + refresh_token=str(creds.refresh_token), + environment=str(creds.environment or "sandbox"), + ), + token_cache_path=token_cache_path, + ) + + def create_purchase_order(self, po_data: dict[str, Any]) -> dict[str, Any]: + """Create a PurchaseOrder in QuickBooks with retry and token refresh.""" + if os.environ.get("QUICKBOOKS_USE_MOCK", "false").lower() == "true": + po_number = str(po_data.get("po_number", "PO-MOCK")) + return { + "id": f"QB-{po_number}", + "doc_number": po_number, + "sync_status": "mock_synced", + "raw": {"mock": True, "po_data": po_data}, + } + + payload = self._build_purchase_order_payload(po_data) + response = self._request_with_retry( + method="POST", + url=self._purchase_order_url(), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + }, + body=json.dumps(payload).encode("utf-8"), + ) + + po = response.get("PurchaseOrder", {}) + qb_id = str(po.get("Id") or po.get("id") or "") + doc_number = str(po.get("DocNumber") or payload.get("DocNumber") or "") + if not qb_id: + raise QuickBooksAPIError( + f"QuickBooks response missing PurchaseOrder ID: {response}" + ) + + return { + "id": qb_id, + "doc_number": doc_number, + "sync_status": "synced", + "raw": response, + } + + def _build_purchase_order_payload(self, po_data: dict[str, Any]) -> dict[str, Any]: + po_number = str(po_data.get("po_number", "")) + amount = float(po_data.get("amount", 0) or 0) + currency = str(po_data.get("currency", "USD")) + vendor_name = str(po_data.get("vendor", "Unknown Vendor")) + # Keep the template payload valid even when callers have not mapped a real + # expense account yet; real deployments can override this via po_data. + account_ref = str(po_data.get("account_ref") or "1") + + # Basic payload that remains reviewable; production mapping can be expanded. + return { + "DocNumber": po_number, + "VendorRef": { + "name": vendor_name, + }, + "PrivateNote": f"Procurement approval sync for {po_number}", + "CurrencyRef": {"value": currency}, + "Line": [ + { + "Amount": amount, + "DetailType": "AccountBasedExpenseLineDetail", + "Description": f"Purchase order {po_number}", + "AccountBasedExpenseLineDetail": { + "AccountRef": {"value": account_ref}, + }, + } + ], + } + + def _purchase_order_url(self) -> str: + base = self._api_base_url() + return f"{base}/v3/company/{self.config.realm_id}/purchaseorder" + + def _api_base_url(self) -> str: + env = self.config.environment.lower() + if env == "production": + return "https://quickbooks.api.intuit.com" + return "https://sandbox-quickbooks.api.intuit.com" + + def _token_url(self) -> str: + return "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer" + + def _request_with_retry( + self, + method: str, + url: str, + headers: dict[str, str], + body: bytes | None = None, + retries: int = 3, + base_delay: float = 0.5, + ) -> dict[str, Any]: + last_error: Exception | None = None + for attempt in range(retries): + try: + access_token = self._ensure_access_token() + req_headers = { + **headers, + "Authorization": f"Bearer {access_token}", + } + req = urllib.request.Request( + url, data=body, headers=req_headers, method=method + ) + with urllib.request.urlopen(req, timeout=20) as resp: + content = resp.read().decode("utf-8") + return json.loads(content) if content else {} + except urllib.error.HTTPError as exc: + # One token refresh retry path on auth failure. + if exc.code == 401: + self._refresh_access_token(force=True) + elif exc.code in (429, 500, 502, 503, 504): + pass + else: + detail = exc.read().decode("utf-8", errors="ignore") + raise QuickBooksAPIError( + f"QuickBooks API HTTP {exc.code}: {detail or exc.reason}" + ) from exc + last_error = exc + except (urllib.error.URLError, TimeoutError) as exc: + last_error = exc + + if attempt < retries - 1: + time.sleep(base_delay * (2**attempt)) + + raise QuickBooksAPIError( + f"QuickBooks API request failed after retries: {last_error}" + ) + + def _ensure_access_token(self) -> str: + # Refresh if missing or expiring within 60 seconds. + if not self._access_token or time.time() > (self._token_expires_at - 60): + self._refresh_access_token() + if not self._access_token: + raise QuickBooksAPIError("Failed to obtain QuickBooks access token") + return self._access_token + + def _refresh_access_token(self, force: bool = False) -> None: + if ( + self._access_token + and not force + and time.time() <= (self._token_expires_at - 60) + ): + return + + auth = base64.b64encode( + f"{self.config.client_id}:{self.config.client_secret}".encode("utf-8") + ).decode("utf-8") + form = urllib.parse.urlencode( + { + "grant_type": "refresh_token", + "refresh_token": self.config.refresh_token, + } + ).encode("utf-8") + + req = urllib.request.Request( + self._token_url(), + data=form, + headers={ + "Authorization": f"Basic {auth}", + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=20) as resp: + payload = json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="ignore") + raise QuickBooksAPIError( + f"QuickBooks token refresh failed ({exc.code}): {detail or exc.reason}" + ) from exc + + self._access_token = payload.get("access_token") + expires_in = int(payload.get("expires_in", 3600)) + self._token_expires_at = time.time() + expires_in + self._save_token_cache() + + def _load_token_cache(self) -> None: + if not self.token_cache_path or not self.token_cache_path.exists(): + return + try: + payload = json.loads(self.token_cache_path.read_text(encoding="utf-8")) + self._access_token = payload.get("access_token") + self._token_expires_at = float(payload.get("token_expires_at", 0)) + except (json.JSONDecodeError, OSError, ValueError): + self._access_token = None + self._token_expires_at = 0.0 + + def _save_token_cache(self) -> None: + if not self.token_cache_path: + return + self.token_cache_path.parent.mkdir(parents=True, exist_ok=True) + try: + os.chmod(self.token_cache_path.parent, 0o700) + except OSError: + pass + self.token_cache_path.write_text( + json.dumps( + { + "access_token": self._access_token, + "token_expires_at": self._token_expires_at, + }, + indent=2, + ), + encoding="utf-8", + ) + try: + os.chmod(self.token_cache_path, 0o600) + except OSError: + pass diff --git a/examples/templates/procurement_approval_agent/tests/test_basic_workflow.py b/examples/templates/procurement_approval_agent/tests/test_basic_workflow.py new file mode 100644 index 0000000000..9f6dec04e9 --- /dev/null +++ b/examples/templates/procurement_approval_agent/tests/test_basic_workflow.py @@ -0,0 +1,327 @@ +"""Basic end-to-end workflow test for Procurement Approval Agent.""" + +from __future__ import annotations + +import asyncio +import json +import os +import sqlite3 +import tempfile +from pathlib import Path + +from pytest import MonkeyPatch + +from procurement_approval_agent.agent import ProcurementApprovalAgent + + +_QB_ENV_KEYS = ( + "QUICKBOOKS_CLIENT_ID", + "QUICKBOOKS_CLIENT_SECRET", + "QUICKBOOKS_REALM_ID", + "QUICKBOOKS_REFRESH_TOKEN", + "QUICKBOOKS_ENV", + "QUICKBOOKS_CREDENTIAL_REF", +) + + +def _setup_test_data(data_dir: Path) -> None: + """Create sample budget DB and approved vendor CSV for test runs.""" + data_dir.mkdir(parents=True, exist_ok=True) + + db_path = data_dir / "budget_tracking.db" + with sqlite3.connect(db_path) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS department_budget ( + department TEXT PRIMARY KEY, + allocated REAL NOT NULL, + spent REAL NOT NULL + ) + """ + ) + conn.executemany( + "INSERT OR REPLACE INTO department_budget (department, allocated, spent) VALUES (?, ?, ?)", + [ + ("engineering", 50000, 18000), + ("finance", 30000, 12000), + ("operations", 40000, 25000), + ], + ) + conn.commit() + + vendors_csv = data_dir / "approved_vendors.csv" + vendors_csv.write_text( + "vendor_name\nAcme Supplies\nTechSource LLC\nNorthwind Office Systems\nGlobal Industrial\n", + encoding="utf-8", + ) + qb_mock_path = data_dir / "qb_mock_responses.json" + if qb_mock_path.exists(): + qb_mock_path.unlink() + + +def _set_qb_creds(enabled: bool) -> None: + if enabled: + os.environ["QUICKBOOKS_CLIENT_ID"] = "mock-client-id" + os.environ["QUICKBOOKS_CLIENT_SECRET"] = "mock-client-secret" + os.environ["QUICKBOOKS_REALM_ID"] = "mock-realm-id" + else: + os.environ.pop("QUICKBOOKS_CLIENT_ID", None) + os.environ.pop("QUICKBOOKS_CLIENT_SECRET", None) + os.environ.pop("QUICKBOOKS_REALM_ID", None) + + +def _capture_qb_env() -> dict[str, str | None]: + return {key: os.environ.get(key) for key in _QB_ENV_KEYS} + + +def _restore_qb_env(previous_env: dict[str, str | None]) -> None: + for key, value in previous_env.items(): + if value is None: + os.environ.pop(key, None) + else: + os.environ[key] = value + + +def _run_workflow(mock_qb: bool, data_dir: Path) -> tuple[dict, Path]: + context = { + "item": "Laptop", + "cost": 1200, + "department": "engineering", + "requester": "alice@company.com", + "justification": "Need new laptop for ML development work", + "vendor": "TechSource LLC", + } + agent = ProcurementApprovalAgent() + result = asyncio.run(agent.run(context, mock_mode=True, mock_qb=mock_qb)) + assert result.success is True + assert isinstance(result.output, dict) + qb_mock_path = data_dir / "qb_mock_responses.json" + return result.output, qb_mock_path + + +def test_full_workflow_api_path_mock_mode() -> None: + previous_qb_env = _capture_qb_env() + _set_qb_creds(enabled=True) + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / "agent-data" + _setup_test_data(data_dir) + previous_storage_root = os.environ.get("HIVE_AGENT_STORAGE_ROOT") + previous_data_dir = os.environ.get("PROCUREMENT_APPROVAL_AGENT_DATA_DIR") + os.environ["HIVE_AGENT_STORAGE_ROOT"] = tmpdir + os.environ["PROCUREMENT_APPROVAL_AGENT_DATA_DIR"] = str(data_dir) + try: + output, qb_mock_path = _run_workflow(mock_qb=True, data_dir=data_dir) + + assert output.get("budget_status") == "auto_approved" + assert output.get("vendor_approved") is True + assert output.get("po_number", "").startswith("PO-") + assert output.get("validated_request", {}).get("item") == "Laptop" + assert ( + output.get("validated_request", {}).get("requester") + == "alice@company.com" + ) + assert float(output.get("validated_request", {}).get("cost", 0)) == 1200.0 + assert output.get("sync_method") == "api" + assert output.get("qb_po_id", "").startswith("QB-") + assert output.get("sync_status") == "mock_synced" + assert len(output.get("po_files_created", [])) == 3 + assert len(output.get("notifications_created", [])) == 3 + assert qb_mock_path.exists() is True + qb_records = json.loads(qb_mock_path.read_text(encoding="utf-8")) + assert isinstance(qb_records, list) and len(qb_records) >= 1 + assert qb_records[-1]["response"]["qb_po_id"].startswith("QB-") + finally: + if previous_storage_root is None: + os.environ.pop("HIVE_AGENT_STORAGE_ROOT", None) + else: + os.environ["HIVE_AGENT_STORAGE_ROOT"] = previous_storage_root + if previous_data_dir is None: + os.environ.pop("PROCUREMENT_APPROVAL_AGENT_DATA_DIR", None) + else: + os.environ["PROCUREMENT_APPROVAL_AGENT_DATA_DIR"] = previous_data_dir + _restore_qb_env(previous_qb_env) + + +def test_full_workflow_csv_fallback_mock_mode() -> None: + previous_qb_env = _capture_qb_env() + _set_qb_creds(enabled=False) + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / "agent-data" + _setup_test_data(data_dir) + previous_storage_root = os.environ.get("HIVE_AGENT_STORAGE_ROOT") + previous_data_dir = os.environ.get("PROCUREMENT_APPROVAL_AGENT_DATA_DIR") + os.environ["HIVE_AGENT_STORAGE_ROOT"] = tmpdir + os.environ["PROCUREMENT_APPROVAL_AGENT_DATA_DIR"] = str(data_dir) + try: + output, qb_mock_path = _run_workflow(mock_qb=True, data_dir=data_dir) + + assert output.get("sync_method") == "csv" + assert output.get("validated_request", {}).get("item") == "Laptop" + assert ( + output.get("validated_request", {}).get("requester") + == "alice@company.com" + ) + assert float(output.get("validated_request", {}).get("cost", 0)) == 1200.0 + assert output.get("csv_file_path", "").endswith("_qb_manual_import.csv") + assert output.get("import_instructions", "").endswith( + "_qb_import_instructions.md" + ) + assert output.get("qb_po_id") in (None, "") + assert qb_mock_path.exists() is False + + assert ( + data_dir / output["csv_file_path"].replace("data/", "") + ).exists() is True + assert ( + data_dir / output["import_instructions"].replace("data/", "") + ).exists() is True + finally: + if previous_storage_root is None: + os.environ.pop("HIVE_AGENT_STORAGE_ROOT", None) + else: + os.environ["HIVE_AGENT_STORAGE_ROOT"] = previous_storage_root + if previous_data_dir is None: + os.environ.pop("PROCUREMENT_APPROVAL_AGENT_DATA_DIR", None) + else: + os.environ["PROCUREMENT_APPROVAL_AGENT_DATA_DIR"] = previous_data_dir + _restore_qb_env(previous_qb_env) + + +def test_each_workflow_run_generates_unique_po_artifacts() -> None: + previous_qb_env = _capture_qb_env() + _set_qb_creds(enabled=False) + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / "agent-data" + _setup_test_data(data_dir) + previous_storage_root = os.environ.get("HIVE_AGENT_STORAGE_ROOT") + previous_data_dir = os.environ.get("PROCUREMENT_APPROVAL_AGENT_DATA_DIR") + os.environ["HIVE_AGENT_STORAGE_ROOT"] = tmpdir + os.environ["PROCUREMENT_APPROVAL_AGENT_DATA_DIR"] = str(data_dir) + try: + first_output, _ = _run_workflow(mock_qb=True, data_dir=data_dir) + second_output, _ = _run_workflow(mock_qb=True, data_dir=data_dir) + + assert first_output["po_number"] != second_output["po_number"] + for po_output in (first_output, second_output): + assert all( + po_output["po_number"] in rel_path + for rel_path in po_output.get("po_files_created", []) + ) + finally: + if previous_storage_root is None: + os.environ.pop("HIVE_AGENT_STORAGE_ROOT", None) + else: + os.environ["HIVE_AGENT_STORAGE_ROOT"] = previous_storage_root + if previous_data_dir is None: + os.environ.pop("PROCUREMENT_APPROVAL_AGENT_DATA_DIR", None) + else: + os.environ["PROCUREMENT_APPROVAL_AGENT_DATA_DIR"] = previous_data_dir + _restore_qb_env(previous_qb_env) + + +def test_over_budget_request_is_denied(monkeypatch: MonkeyPatch) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / "agent-data" + _setup_test_data(data_dir) + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(Path(tmpdir) / "storage")) + monkeypatch.setenv("PROCUREMENT_APPROVAL_AGENT_DATA_DIR", str(data_dir)) + + result = asyncio.run( + ProcurementApprovalAgent().run( + { + "item": "High-end Server", + "cost": 999999, + "department": "engineering", + "requester": "alice@company.com", + "justification": "Need new infrastructure for testing", + "vendor": "TechSource LLC", + }, + mock_mode=True, + mock_qb=True, + ) + ) + + assert result.success is False + assert result.output["budget_status"] == "denied" + + +def test_unapproved_vendor_is_rejected(monkeypatch: MonkeyPatch) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / "agent-data" + _setup_test_data(data_dir) + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(Path(tmpdir) / "storage")) + monkeypatch.setenv("PROCUREMENT_APPROVAL_AGENT_DATA_DIR", str(data_dir)) + + result = asyncio.run( + ProcurementApprovalAgent().run( + { + "item": "Laptop", + "cost": 1200, + "department": "engineering", + "requester": "alice@company.com", + "justification": "Need new laptop for ML development work", + "vendor": "Not Approved Inc", + }, + mock_mode=True, + mock_qb=True, + ) + ) + + assert result.success is False + assert result.output["vendor_approved"] is False + + +def test_setup_wizard_runs_on_first_execution() -> None: + with tempfile.TemporaryDirectory() as tmpdir: + previous_qb_env = _capture_qb_env() + _set_qb_creds(enabled=False) + previous_storage_root = os.environ.get("HIVE_AGENT_STORAGE_ROOT") + os.environ["HIVE_AGENT_STORAGE_ROOT"] = tmpdir + try: + agent = ProcurementApprovalAgent() + agent._setup(mock_mode=True, mock_qb=True) + assert agent._graph is not None + assert agent._graph.entry_node == "setup-wizard" + finally: + if previous_storage_root is None: + os.environ.pop("HIVE_AGENT_STORAGE_ROOT", None) + else: + os.environ["HIVE_AGENT_STORAGE_ROOT"] = previous_storage_root + _restore_qb_env(previous_qb_env) + + +def test_setup_wizard_is_skipped_after_preference_saved() -> None: + with tempfile.TemporaryDirectory() as tmpdir: + previous_qb_env = _capture_qb_env() + _set_qb_creds(enabled=True) + previous_storage_root = os.environ.get("HIVE_AGENT_STORAGE_ROOT") + os.environ["HIVE_AGENT_STORAGE_ROOT"] = tmpdir + try: + context = { + "item": "Laptop", + "cost": 1200, + "department": "engineering", + "requester": "alice@company.com", + "justification": "Need new laptop for ML development work", + "vendor": "TechSource LLC", + } + first_result = asyncio.run( + ProcurementApprovalAgent().run(context, mock_mode=True, mock_qb=True) + ) + assert first_result.success is True + + setup_file = ( + Path(tmpdir) / "procurement_approval_agent" / "setup_config.json" + ) + assert setup_file.exists() is True + + next_agent = ProcurementApprovalAgent() + next_agent._setup(mock_mode=True, mock_qb=True) + assert next_agent._graph is not None + assert next_agent._graph.entry_node == "pre-execution-check" + finally: + if previous_storage_root is None: + os.environ.pop("HIVE_AGENT_STORAGE_ROOT", None) + else: + os.environ["HIVE_AGENT_STORAGE_ROOT"] = previous_storage_root + _restore_qb_env(previous_qb_env) diff --git a/examples/templates/procurement_approval_agent/tests/test_cli.py b/examples/templates/procurement_approval_agent/tests/test_cli.py new file mode 100644 index 0000000000..82235a0617 --- /dev/null +++ b/examples/templates/procurement_approval_agent/tests/test_cli.py @@ -0,0 +1,27 @@ +"""CLI tests for procurement approval agent.""" + +from __future__ import annotations + +from pathlib import Path + +from click.testing import CliRunner + +from procurement_approval_agent.__main__ import cli + + +def test_reset_setup_command_removes_state(monkeypatch) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + storage_root = Path.cwd() / "storage" + setup_path = storage_root / "procurement_approval_agent" / "setup_config.json" + setup_path.parent.mkdir(parents=True, exist_ok=True) + setup_path.write_text( + '{"setup_completed": true, "preferred_sync_method": "csv"}' + ) + + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(storage_root)) + result = runner.invoke(cli, ["reset-setup"]) + + assert result.exit_code == 0 + assert "Removed setup state" in result.output + assert setup_path.exists() is False diff --git a/examples/templates/procurement_approval_agent/tests/test_e2e.py b/examples/templates/procurement_approval_agent/tests/test_e2e.py new file mode 100644 index 0000000000..0e0c9fc202 --- /dev/null +++ b/examples/templates/procurement_approval_agent/tests/test_e2e.py @@ -0,0 +1,46 @@ +"""Basic structure tests for Procurement Approval Agent.""" + +from procurement_approval_agent.agent import default_agent +from procurement_approval_agent.quickbooks_api import QuickBooksAPI, QuickBooksConfig + + +def test_graph_has_required_nodes() -> None: + node_ids = {node.id for node in default_agent.nodes} + assert "setup-wizard" in node_ids + assert "intake" in node_ids + assert "budget-check" in node_ids + assert "manager-approval" in node_ids + assert "vendor-check" in node_ids + assert "po-generator" in node_ids + assert "integration-check" in node_ids + assert "quickbooks-sync" in node_ids + assert "csv-export" in node_ids + assert "notifications" in node_ids + + +def test_validation_passes() -> None: + result = default_agent.validate() + assert result["valid"] is True + + +def test_quickbooks_payload_includes_account_ref() -> None: + api = QuickBooksAPI( + QuickBooksConfig( + client_id="client", + client_secret="secret", + realm_id="realm", + refresh_token="refresh", + ) + ) + + payload = api._build_purchase_order_payload( + { + "po_number": "PO-TEST-001", + "vendor": "TechSource LLC", + "amount": 1200, + "currency": "USD", + } + ) + + line_detail = payload["Line"][0]["AccountBasedExpenseLineDetail"] + assert line_detail["AccountRef"]["value"] == "1" diff --git a/examples/templates/procurement_approval_agent/tests/test_monitoring.py b/examples/templates/procurement_approval_agent/tests/test_monitoring.py new file mode 100644 index 0000000000..44592b0d88 --- /dev/null +++ b/examples/templates/procurement_approval_agent/tests/test_monitoring.py @@ -0,0 +1,180 @@ +"""Tests for continuous request monitoring and auto-trigger execution.""" + +from __future__ import annotations + +import asyncio +import json +import tempfile +from pathlib import Path + +from procurement_approval_agent.monitor import RequestMonitor + + +def _request_payload() -> dict: + return { + "item": "Laptop", + "cost": 1200, + "department": "engineering", + "requester": "alice@company.com", + "justification": "Need new laptop for ML development work", + "vendor": "TechSource LLC", + } + + +def _write_request(path: Path, payload: dict) -> None: + path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +def test_monitor_processes_api_path(monkeypatch) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + tmp = Path(tmpdir) + watch_dir = tmp / "watched_requests" + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(tmp / "storage")) + monkeypatch.setenv("QUICKBOOKS_CLIENT_ID", "x") + monkeypatch.setenv("QUICKBOOKS_CLIENT_SECRET", "y") + monkeypatch.setenv("QUICKBOOKS_REALM_ID", "z") + + request_file = watch_dir / "req_api.json" + watch_dir.mkdir(parents=True, exist_ok=True) + _write_request(request_file, _request_payload()) + + monitor = RequestMonitor( + watch_dir=watch_dir, + mock_mode=True, + mock_qb=True, + notify=False, + ) + results = asyncio.run(monitor.process_once()) + + assert len(results) == 1 + assert results[0].success is True + assert results[0].archive_file.parent.name == "done" + + output = json.loads(results[0].output_file.read_text(encoding="utf-8")) + assert output["output"]["sync_method"] == "api" + assert output["output"]["qb_po_id"].startswith("QB-") + + +def test_monitor_processes_csv_path(monkeypatch) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + tmp = Path(tmpdir) + watch_dir = tmp / "watched_requests" + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(tmp / "storage")) + monkeypatch.delenv("QUICKBOOKS_CLIENT_ID", raising=False) + monkeypatch.delenv("QUICKBOOKS_CLIENT_SECRET", raising=False) + monkeypatch.delenv("QUICKBOOKS_REALM_ID", raising=False) + + request_file = watch_dir / "req_csv.json" + watch_dir.mkdir(parents=True, exist_ok=True) + _write_request(request_file, _request_payload()) + + monitor = RequestMonitor( + watch_dir=watch_dir, + mock_mode=True, + mock_qb=True, + notify=False, + ) + results = asyncio.run(monitor.process_once()) + + assert len(results) == 1 + assert results[0].success is True + assert results[0].archive_file.parent.name == "done" + + output = json.loads(results[0].output_file.read_text(encoding="utf-8")) + assert output["output"]["sync_method"] == "csv" + assert output["output"]["csv_file_path"].endswith("_qb_manual_import.csv") + + +def test_monitor_duplicate_detection_and_force(monkeypatch) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + tmp = Path(tmpdir) + watch_dir = tmp / "watched_requests" + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(tmp / "storage")) + monkeypatch.delenv("QUICKBOOKS_CLIENT_ID", raising=False) + monkeypatch.delenv("QUICKBOOKS_CLIENT_SECRET", raising=False) + monkeypatch.delenv("QUICKBOOKS_REALM_ID", raising=False) + + watch_dir.mkdir(parents=True, exist_ok=True) + _write_request(watch_dir / "req1.json", _request_payload()) + monitor = RequestMonitor( + watch_dir=watch_dir, + mock_mode=True, + mock_qb=True, + notify=False, + force=False, + ) + first = asyncio.run(monitor.process_once()) + assert len(first) == 1 + assert first[0].success is True + + _write_request(watch_dir / "req2.json", _request_payload()) + second = asyncio.run(monitor.process_once()) + assert len(second) == 1 + assert second[0].success is False + assert "Duplicate request detected" in second[0].output.get("error", "") + + _write_request(watch_dir / "req3.json", _request_payload()) + force_monitor = RequestMonitor( + watch_dir=watch_dir, + mock_mode=True, + mock_qb=True, + notify=False, + force=True, + ) + third = asyncio.run(force_monitor.process_once()) + assert len(third) == 1 + assert third[0].success is True + + +def test_monitor_skip_process_exits_early(monkeypatch) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + tmp = Path(tmpdir) + watch_dir = tmp / "watched_requests" + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(tmp / "storage")) + + request_file = watch_dir / "req_skip.json" + watch_dir.mkdir(parents=True, exist_ok=True) + _write_request(request_file, _request_payload()) + + monitor = RequestMonitor( + watch_dir=watch_dir, + mock_mode=True, + mock_qb=True, + notify=False, + default_process_request=False, + ) + results = asyncio.run(monitor.process_once()) + + assert len(results) == 1 + assert results[0].success is True + output = json.loads(results[0].output_file.read_text(encoding="utf-8")) + assert output["output"]["process_request"] is False + assert "validated_request" not in output["output"] + + +def test_monitor_forces_api_path_without_env_credentials(monkeypatch) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + tmp = Path(tmpdir) + watch_dir = tmp / "watched_requests" + monkeypatch.setenv("HIVE_AGENT_STORAGE_ROOT", str(tmp / "storage")) + monkeypatch.delenv("QUICKBOOKS_CLIENT_ID", raising=False) + monkeypatch.delenv("QUICKBOOKS_CLIENT_SECRET", raising=False) + monkeypatch.delenv("QUICKBOOKS_REALM_ID", raising=False) + + request_file = watch_dir / "req_forced_api.json" + watch_dir.mkdir(parents=True, exist_ok=True) + _write_request(request_file, _request_payload()) + + monitor = RequestMonitor( + watch_dir=watch_dir, + mock_mode=True, + mock_qb=True, + notify=False, + sync_method="api", + ) + results = asyncio.run(monitor.process_once()) + + assert len(results) == 1 + assert results[0].success is True + output = json.loads(results[0].output_file.read_text(encoding="utf-8")) + assert output["output"]["sync_method"] == "api"