Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ Time window: `opened_at − 30 min`. On empty result, retries once with a 60-min
**Implemented connectors:**
- `SSHLogConnector` (`implementations/clusters/onprem/`) — provider-agnostic SSH connector for any on-premise cluster (CDP, HDP, Oracle RAC, MapR, etc.). Log dirs and SSH credentials are constructor params.
- `GCPLogConnector` (`implementations/clusters/cloud/gcp/`) — Cloud Logging API with vault-backed service account.
- `AzureLogConnector` (`implementations/clusters/cloud/azure/`) — Azure Monitor Log Analytics workspace, KQL-based queries, `DefaultAzureCredential` auth. Workspace ID resolved from vault at query time.

**Cloud stubs:** Databricks, AWS EMR, Azure Monitor — raise `NotImplementedError`, full implementations planned.
**Cloud stubs:** Databricks, AWS EMR — raise `NotImplementedError`, full implementations planned.

### Agent 3 — Classifier ✅ Implemented

Expand Down Expand Up @@ -484,9 +485,9 @@ aria/
│ │ ├── onprem/ # SSHLogConnector — any bare-metal/VM cluster (CDP, HDP, Oracle RAC, MapR, etc.)
│ │ └── cloud/
│ │ ├── gcp/ # GCPLogConnector — Cloud Logging API
│ │ ├── azure/ # ✅ AzureLogConnector — Log Analytics workspace (Azure Monitor)
│ │ ├── databricks/ # stub — planned
│ │ ├── aws/ # stub — planned
│ │ └── azure/ # stub — planned
│ │ └── aws/ # stub — planned
│ ├── itsm/
│ │ └── servicenow/ # ServiceNowConnector
│ ├── coms/
Expand All @@ -508,7 +509,9 @@ aria/
├── documentation/ # MkDocs site source (mkdocs serve)
├── infra/
│ └── terraform/
│ └── uc_testing/ # UC1 (Hadoop VMs) · UC2 (Dataproc) · UC3 (GCP native)
│ └── uc_testing/
│ ├── gcp/ # UC1 (Hadoop VMs) · UC2 (Dataproc) · UC3 (GCP native)
│ └── azure/ # UC1 (Hadoop VMs) · UC2 (HDInsight) · UC3 (Azure native)
├── ml/ # Datasets, few-shot prompt assets, evaluation scripts
├── tests/acceptance/ # ground_truth.json · round results · AC reports
├── Dockerfile # P1.5 S3 — python:3.11-slim, non-root, single stage
Expand Down Expand Up @@ -627,7 +630,7 @@ Phase 1 is complete when all of the following pass on 10 consecutive test incide
| Phase 1.5 | S1: Structured logging — structlog, `run_id`, lifecycle events, RunRecord | ✅ Done |
| Phase 1.5 | S2: Monitoring foundation — run store, REST API, Alpine.js dashboard, mode scaffold | ✅ Done |
| Phase 1.5 | S3: Docker + `ARIA_CONFIG_PATH` + `VertexAILLMClient` + LLM provider DI (incl. #84 security fix) | ✅ Done |
| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring, KB runbooks, CMDB validation | 🔜 Planned |
| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring (GCP + Azure), KB runbooks, AzureLogConnector wired | 🔄 In progress |
| Phase 1.5 | S5: Round 2 acceptance testing — 30 incidents on UC1 + UC2 real infrastructure | 🔜 Planned |
| Phase 1.5 | S6: GCP native connectors — BQ, Cloud Functions, Pub/Sub, GCS | 🔜 Planned |
| Phase 2 | Human validation gate + write-back to ServiceNow | 💡 Planned |
Expand Down
33 changes: 26 additions & 7 deletions api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from core.interfaces.vault import VaultInterface
from core.models import PlatformTag
from core.orchestrator.pipeline import ARIAPipeline
from implementations.clusters.cloud.azure.log_connector import AzureLogConnector
from implementations.clusters.cloud.gcp.log_connector import GCPLogConnector
from implementations.clusters.onprem.log_connector import SSHLogConnector
from implementations.itsm.servicenow.connector import ServiceNowConnector
Expand Down Expand Up @@ -75,7 +76,11 @@ def _get_vault() -> VaultInterface:
from implementations.vault.gcp_secret_manager import GCPSecretManagerVault

return GCPSecretManagerVault.from_env()
# hashicorp, aws, azure already have implementations — wire them here as they get used.
if backend == "azure":
from implementations.vault.azure_kv import AzureKeyVaultClient

return AzureKeyVaultClient.from_env()
# hashicorp, aws — implementations exist, wire when needed.
return EnvVarVault()


Expand Down Expand Up @@ -144,7 +149,10 @@ def get_agent3() -> ClassifierAgent:
"ARIA_AGENT3_MODEL env var is not set "
"(or ARIA_GLOBAL_MODEL when ARIA_LLM_MODE=global)"
)
return ClassifierAgent(llm_client=_get_llm_client(model))
return ClassifierAgent(
llm_client=_get_llm_client(model),
analyser_kb_dir=cfg.analyser_kb_dir(),
)


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -228,9 +236,9 @@ def get_pipeline() -> "ARIAPipeline":
def get_agent2() -> LogExtractorAgent:
"""Build and cache the Agent 2 (Log Extractor) instance.

Registers CDP (SSH) and GCP (Cloud Logging) connectors. Missing credentials
are non-fatal at construction — connectors resolve secrets at query time
and return empty results gracefully if credentials are absent.
Registers CDP (SSH), GCP (Cloud Logging), and Azure (Log Analytics) connectors.
Missing credentials are non-fatal at construction — connectors resolve secrets
at query time and return empty results gracefully if credentials are absent.
Injects an LLM client for query planning if ARIA_AGENT2_MODEL is set.
"""
vault = _get_vault()
Expand All @@ -240,12 +248,23 @@ def get_agent2() -> LogExtractorAgent:
registry = {
PlatformTag.CDP: SSHLogConnector(
vault,
ssh_key_secret="CDP_SSH_KEY",
ssh_key_secret=cfg.cdp_ssh_key_secret(),
ssh_user=cfg.cdp_ssh_user(),
host_key_secret="CDP_HOST_KEY" if os.environ.get("CDP_HOST_KEY") else None,
log_dirs=cfg.cdp_log_dirs(),
),
PlatformTag.GCP: GCPLogConnector(vault),
# resource_types scopes Cloud Logging queries to Dataproc cluster and job logs (UC2).
# S6 will generalise this into a configurable resource_type_templates dict.
PlatformTag.GCP: GCPLogConnector(
vault,
resource_types=["cloud_dataproc_cluster", "cloud_dataproc_job"],
),
# Azure Log Analytics workspace — workspace ID resolved from AZURE_LOG_WORKSPACE_ID
# secret at query time. Covers UC2 (HDInsight) and UC3 (Azure-native) incidents.
PlatformTag.AZURE: AzureLogConnector(
vault,
workspace_id_secret="AZURE_LOG_WORKSPACE_ID",
),
}
llm = None
model = _resolve_model("2")
Expand Down
25 changes: 21 additions & 4 deletions conf_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,29 @@ gcp: # only required when connectors.log = gcp

cdp:
ssh_user: hadoop # OS user with read access to log directories below
log_dirs: # directories searched for logs on CDP cluster nodes
- /var/log/hadoop-hdfs # leave unset to use these defaults
- /var/log/hadoop-yarn
ssh_key_secret: CDP_SSH_KEY # vault key name for the SSH private key PEM.
# EnvVarVault: reads os.environ[ssh_key_secret] directly.
# GCPSecretManagerVault: normalises underscores→hyphens and
# prepends 'aria-', so 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'.
# UC1 (TF): TF provisions secret 'aria-uc1-ssh-private-key'.
# Set ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY to match, or
# rename the TF secret to 'aria-cdp-ssh-key'.
log_dirs: # directories searched for logs on CDP cluster nodes.
# UC1 (TF Hadoop VMs) use subdirectory paths — set these
# at deployment time from TF startup script:
- /var/log/hadoop/hdfs # TF UC1 NameNode / DataNode logs
- /var/log/hadoop/yarn # TF UC1 ResourceManager / NodeManager logs
- /var/log/hive
- /var/log/oozie
- /var/log/spark
- /var/log/kafka
- /var/log/zookeeper
- /var/log/oozie
- /var/log/nifi

knowledge_base:
analyser_kb_dir: data/knowledge_base/analyser_kb # labeled log excerpts for Agent 3 few-shot prompting
# doubles as fine-tuning corpus for the future
# specialist Agent 3 model (roadmap: post-POC)

slack:
channel_id: <your-slack-channel-id> # channel where Agent 4 posts notifications
68 changes: 58 additions & 10 deletions core/agents/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"""

import json
import time
from pathlib import Path
from typing import Any

from core.exceptions import ClassificationError
Expand All @@ -23,6 +25,21 @@
{"oom", "cpu", "disk", "network", "auth", "db_lock", "pipeline", "unknown"}
)


def _load_analyser_kb(directory: str) -> str:
"""Load analyser_kb files and join them as a single few-shot reference block.

Files are sorted alphabetically so the ordering is deterministic. Returns an
empty string when the directory is missing — the classifier degrades gracefully.
"""
path = Path(directory)
if not path.is_dir():
return ""
return "\n\n---\n\n".join(
f.read_text(encoding="utf-8").strip() for f in sorted(path.glob("*.md"))
)


_SYSTEM_PROMPT = """\
You are ARIA, an AI operations assistant. Classify the root cause of the incident \
from the metadata and log evidence below.
Expand Down Expand Up @@ -58,14 +75,23 @@ class ClassifierAgent:
falls back to stub behaviour (dry-run compatibility).
"""

def __init__(self, llm_client: LLMClientInterface | None = None) -> None:
def __init__(
self,
llm_client: LLMClientInterface | None = None,
analyser_kb_dir: str | None = None,
) -> None:
"""Initialise the classifier.

Args:
llm_client: LLM client used to call the model. When None, the agent
falls back to stub behaviour (error_class='unknown', LOW confidence).
analyser_kb_dir: Path to analyser_kb directory of labeled log excerpts.
When provided, examples are injected into every LLM call as
few-shot reference context. Also serves as the training data
corpus for the future fine-tuned Agent 3 model.
"""
self._llm = llm_client
self._few_shot_examples: str = _load_analyser_kb(analyser_kb_dir) if analyser_kb_dir else ""

@log_agent_lifecycle("agent3")
def run(self, state: PipelineState) -> PipelineState:
Expand Down Expand Up @@ -109,16 +135,36 @@ def run(self, state: PipelineState) -> PipelineState:
logger.info("classifier: running for %s", state.incident_number)

messages = self._build_messages(state)
try:
raw = self._llm.complete(
messages,
system=_SYSTEM_PROMPT,
temperature=0.0,
max_tokens=1024,
# Retry once with a 1-second backoff before surfacing as ClassificationError (#83).
# A transient LLM failure must not kill Agent 4 — the notify-only guarantee requires
# Agent 4 to always run, even when Agent 3 cannot classify.
_llm_exc: Exception | None = None
for attempt in range(2):
try:
raw = self._llm.complete(
messages,
system=_SYSTEM_PROMPT,
temperature=0.0,
max_tokens=1024,
)
_llm_exc = None
break
except Exception as exc:
_llm_exc = exc
if attempt == 0:
logger.warning(
"classifier: LLM call failed for %s (attempt 1/2), retrying: %s",
state.incident_number,
exc,
)
time.sleep(1)
if _llm_exc is not None:
logger.error(
"classifier: LLM call failed for %s after 2 attempts: %s",
state.incident_number,
_llm_exc,
)
except Exception as exc:
logger.error("classifier: LLM call failed for %s: %s", state.incident_number, exc)
raise ClassificationError(f"LLM call failed: {exc}") from exc
raise ClassificationError(f"LLM call failed: {_llm_exc}") from _llm_exc

try:
classification, log_request = self._parse_response(raw)
Expand Down Expand Up @@ -193,6 +239,8 @@ def _build_messages(self, state: PipelineState) -> list[dict[str, str]]:
f"Affected CI: {affected_ci}\n\n"
f"Log evidence — {log_section}"
)
if self._few_shot_examples:
content += f"\n\n## Reference log examples\n\n{self._few_shot_examples}"
return [{"role": "user", "content": content}]

def _parse_response(self, raw: str) -> tuple[ClassificationResult | None, LogRequest | None]:
Expand Down
29 changes: 28 additions & 1 deletion core/agents/log_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,12 @@ def _plan_with_llm(self, state: PipelineState) -> LogQueryPlan:
)

data = json.loads(response)
raw_paths = list(data.get("log_paths", []))
return LogQueryPlan(
connector_name=str(data["connector_name"]),
log_paths=list(data.get("log_paths", [])),
# Validate LLM-planned paths against safe prefixes to prevent path-injection
# from adversarial log content coercing the LLM toward /etc/ or ~/ targets (#85).
log_paths=_validate_log_paths(raw_paths),
keywords=list(data.get("keywords", [])),
time_window_minutes=int(data.get("time_window_minutes", _DEFAULT_WINDOW)),
reasoning=str(data.get("reasoning", "")),
Expand Down Expand Up @@ -496,3 +499,27 @@ def _empty(host: str, platform_tag: PlatformTag) -> LogQueryResult:
total_scanned=0,
confidence=ConfidenceBand.LOW,
)


# Allowed log path prefixes for LLM-planned paths (#85).
# Paths outside these prefixes are dropped before being passed to connectors.
# Mirrors the CI-name allowlist used for host resolution in _resolve_ci_from_request().
_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log/",)


def _validate_log_paths(paths: list[str]) -> list[str]:
"""Filter LLM-planned log paths to known-safe directory prefixes.

Prevents adversarial log content from coercing the LLM into planning paths
that point outside log directories (e.g. /etc/ssh/, ~/.ssh/). Any path that
does not start with an allowed prefix is dropped with a warning.
"""
safe = [p for p in paths if any(p.startswith(prefix) for prefix in _SAFE_LOG_PREFIXES)]
dropped = len(paths) - len(safe)
if dropped:
logger.warning(
"Agent 2: dropped %d LLM-planned log path(s) outside allowed prefixes %s",
dropped,
_SAFE_LOG_PREFIXES,
)
return safe
27 changes: 27 additions & 0 deletions core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ def cdp_ssh_user() -> str:
return _get(["cdp", "ssh_user"], "CDP_SSH_USER", "hadoop")


def cdp_ssh_key_secret() -> str:
"""Return the vault key name used to retrieve the CDP SSH private key.

For EnvVarVault: the key name is read directly from the environment.
For GCPSecretManagerVault: underscores are normalised to hyphens and an
'aria-' prefix is added — e.g. 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'.
The TF-provisioned secret name for UC1 is 'aria-uc1-ssh-private-key', which
requires setting cdp.ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY in conf.yaml
(resolves to 'aria-cdp-uc1-ssh-private-key') or renaming the TF secret.
Defaults to 'CDP_SSH_KEY' (backward-compatible with pre-S4 deployments).
"""
return _get(["cdp", "ssh_key_secret"], "CDP_SSH_KEY_SECRET", "CDP_SSH_KEY")


def cdp_log_dirs() -> list[str]:
"""Return directories to search for logs on CDP cluster nodes.

Expand Down Expand Up @@ -212,6 +226,19 @@ def run_db_path() -> str:
return _get(["runs", "db_path"], "ARIA_RUN_DB_PATH", "data/runs.db")


# ── Knowledge Base ────────────────────────────────────────────────────────────


def analyser_kb_dir() -> str | None:
"""Return path to the analyser_kb directory of labeled log excerpts for Agent 3 few-shot prompting.

Reads knowledge_base.analyser_kb_dir from conf.yaml / ARIA_ANALYSER_KB_DIR env var.
None when not configured — Agent 3 classifies without few-shot examples.
"""
val = _get(["knowledge_base", "analyser_kb_dir"], "ARIA_ANALYSER_KB_DIR")
return val or None


# ── GCP ───────────────────────────────────────────────────────────────────────


Expand Down
30 changes: 24 additions & 6 deletions core/orchestrator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from langgraph.graph.state import CompiledStateGraph

import core.config as cfg
from core.exceptions import ClassificationError
from core.interfaces.run_state_store import RunStateStoreInterface
from core.interfaces.run_store import RunStoreInterface
from core.logging_config import configure_logging
Expand Down Expand Up @@ -129,13 +130,30 @@ def _agent2_node(self, state: PipelineState) -> dict:
}

def _agent3_node(self, state: PipelineState) -> dict:
"""LangGraph node wrapper for Agent 3. Returns classification and any pending log request."""
"""LangGraph node wrapper for Agent 3. Returns classification and any pending log request.

ClassificationError is caught here rather than propagated — if Agent 3 cannot
classify, the error is recorded in state and the pipeline routes to Agent 4 so
the notify-only guarantee is never violated (#83).
"""
self._track_agent(state, "agent3")
result = self._agent3.run(state)
return {
"classification": result.classification,
"pending_log_request": result.pending_log_request,
}
try:
result = self._agent3.run(state)
return {
"classification": result.classification,
"pending_log_request": result.pending_log_request,
}
except ClassificationError as exc:
logger.error(
"agent3: ClassificationError for %s — routing to agent4 with error: %s",
state.incident_number,
exc,
)
return {
"classification": None,
"pending_log_request": None,
"error": str(exc),
}

def _agent4_node(self, state: PipelineState) -> dict:
"""LangGraph node wrapper for Agent 4. Returns notification_sent and any delivery error."""
Expand Down
Loading