Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/google-adk/sample-agent/.gcloudignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Cloud Run source upload allowlist.
# Ignore everything, then re-include only what the container needs at build/run.
# This forces the pip buildpack (requirements.txt) and avoids uploading the
# uv.lock (stale, missing mcp), the local .venv, secrets (.env), and a365 config.
*

!*.py
!Procfile
!requirements.txt
!.python-version
!ToolingManifest.json

# Re-exclude helper/local-only python files matched by !*.py above.
_freeze.py
1 change: 1 addition & 0 deletions python/google-adk/sample-agent/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13
1 change: 1 addition & 0 deletions python/google-adk/sample-agent/Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
web: python main.py
15 changes: 15 additions & 0 deletions python/google-adk/sample-agent/_freeze.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import importlib.metadata as m
Comment thread
evanmitchellgithub marked this conversation as resolved.
Outdated

skip = {"sample-google-adk", "pip", "setuptools", "wheel"}
# Windows-only packages present in the local venv that have no Linux build.
skip |= {"pywin32", "pywin32-ctypes", "pypiwin32", "pywinpty", "winsdk", "windows-curses"}
lines = []
for d in m.distributions():
name = d.metadata["Name"]
if not name or name.lower() in skip:
continue
lines.append(f"{name}=={d.version}")
lines = sorted(set(lines), key=str.lower)
with open("requirements.txt", "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
print("count", len(lines))
66 changes: 64 additions & 2 deletions python/google-adk/sample-agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,70 @@ async def invoke_agent_with_scope(
# Fall back to env vars so observability baggage is still populated.
recipient = context.activity.recipient
tenant_id = getattr(recipient, "tenant_id", None) or os.getenv("AGENTIC_TENANT_ID", "")
agent_id = getattr(recipient, "agentic_user_id", None) or os.getenv("AGENTIC_USER_ID", "")
with BaggageBuilder().tenant_id(tenant_id).agent_id(agent_id).build():
# The A365 observability ingestion endpoint enforces ValidateAgentIdentity:
# the {agentId} in the export URL (and every gen_ai.agent.id span attribute)
# must equal the application id (azp/appid) carried in the agentic OBO token.
# That token's azp is the agent_app_instance_id (AGENTIC_APP_ID), NOT the
# agentic_user_id — so the observability agent id must be the app instance id.
agent_id = getattr(recipient, "agentic_app_id", None) or os.getenv("AGENTIC_APP_ID", "")

# Identity enrichment so the exported spans carry the dimensions the Agent 365 /
# IDEAs activity rollup needs to attribute usage. Without these, the admin-center
# "Activity" tabs stay empty ("When people are using agents, their usage data will
# show up here") even though spans ingest successfully:
# - user.id (the invoking human) -> "active users" metric
# - microsoft.a365.agent.blueprint.id -> blueprint-level Activity tab
# - microsoft.agent.user.id / email -> agent (instance) attribution
from_prop = context.activity.from_property
user_aad_object_id = getattr(from_prop, "aad_object_id", None) or getattr(from_prop, "id", None)
user_display_name = getattr(from_prop, "name", None)
# Blueprint id (a365.generated.config.json → agentBlueprintId). This is the
# blueprint/template id, distinct from AGENTIC_APP_ID (the instance app id).
# The service-connection CLIENTID already carries the blueprint id, so we reuse
# it as the fallback and no extra env var is required for deployment.
blueprint_id = (
getattr(recipient, "agent_blueprint_id", None)
or os.getenv("AGENT_BLUEPRINT_ID")
or os.getenv("CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID", "")
)
agentic_user_id = getattr(recipient, "agentic_user_id", None) or os.getenv("AGENTIC_USER_ID", "")
agentic_user_email = os.getenv("AGENTIC_UPN", "")

baggage = (
BaggageBuilder()
.tenant_id(tenant_id)
.agent_id(agent_id)
.agent_blueprint_id(blueprint_id)
.agentic_user_id(agentic_user_id)
.agentic_user_email(agentic_user_email)
.user_id(user_aad_object_id)
.user_name(user_display_name)
)
with baggage.build():
# When running with an agentic auth handler (production), exchange for an
# observability-scoped token and cache it so the A365 exporter can authenticate
# its span export for this (tenant, agent). Best-effort: a failure here must
# not break the turn — it only means spans aren't exported this cycle.
if auth_handler_name and tenant_id and agent_id:
try:
from microsoft_agents_a365.runtime.environment_utils import (
get_observability_authentication_scope,
)
from token_cache import cache_agentic_token

exaau_token = await auth.exchange_token(
context,
scopes=get_observability_authentication_scope(),
auth_handler_id=auth_handler_name,
)
if exaau_token and getattr(exaau_token, "token", None):
cache_agentic_token(tenant_id, agent_id, exaau_token.token)
logger.info("Cached observability token for agent_id=%s", agent_id)
else:
logger.warning("Observability token exchange returned no token")
except Exception as e:
logger.warning("Observability token exchange failed: %s", e)

return await self.invoke_agent(message=message, auth=auth, auth_handler_name=auth_handler_name, context=context)

async def _cleanup_agent(self, agent: Agent):
Expand Down
272 changes: 272 additions & 0 deletions python/google-adk/sample-agent/debug_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
Egress diagnostic probe for the Agent 365 Observability ingestion endpoint.

Purpose
-------
The agent emits correctly-shaped, identity-stamped spans, but the A365 span
exporter's HTTPS POST to ``agent365.svc.cloud.microsoft`` was observed failing
with ``SSL: UNEXPECTED_EOF_WHILE_READING`` (a TLS-handshake reset). Every prior
handshake test was run from the corp network or against a *different* host
(``login.microsoftonline.com``) — never a raw handshake from inside the
Cloud Run container to the ingestion host itself.

This module reproduces the export network path from the **exact** runtime and
egress of the live agent. It is mounted as ``GET /debug/otlp-probe`` and runs,
in order:

1. **DNS** — what the container resolves ``agent365.svc.cloud.microsoft`` to.
2. **Raw TLS handshake** — a bare ``socket`` + ``ssl`` connection (default
context, then TLS 1.2 forced) to ``host:443`` with SNI, reporting the
negotiated protocol/cipher and the peer-certificate subject. This isolates
the handshake from any HTTP/auth concerns.
3. **HTTPS POST** — a minimal single-span OTLP/HTTP+JSON request via the same
``requests`` library the SDK exporter uses, to the same URL the SDK builds.
If a cached observability token exists (from a prior authenticated turn) it
is attached; otherwise the POST runs token-less (a ``401``/``403`` still
proves TLS + HTTP reachability — the opposite of a handshake reset).

Interpreting results
---------------------
* Any **HTTP status code** at step 3 (even 401/403/400) ⇒ TLS and the network
path from Cloud Run are healthy; the "Microsoft blocks GCP at TLS" theory is
disproven and the original failure was transient or exporter-level.
* A reproduced **``UNEXPECTED_EOF`` / handshake error** at step 2 or 3 ⇒ the
network hypothesis is confirmed *from the real egress path*, giving hard
evidence for an egress-IP allowlist or relay decision.

Security
--------
The route is gated by the ``DEBUG_PROBE_KEY`` env var (passed as ``?key=``).
If the env var is unset the route returns 404 so it can't be probed on a public
URL. The response never includes the bearer token or any cert private material.
This is a temporary diagnostic — remove the route (or unset the key) afterward.
"""

import os
import socket
import ssl
import time
import traceback
import logging

import requests

logger = logging.getLogger(__name__)

# The ingestion host the SDK exporter targets by default. Kept in sync with
# DEFAULT_ENDPOINT_URL in microsoft_agents_a365.observability.core.exporters.
_DEFAULT_HOST = "agent365.svc.cloud.microsoft"
_HTTP_TIMEOUT_SECONDS = 30
_TLS_TIMEOUT_SECONDS = 15


def _resolve_endpoint() -> tuple[str, str, str, str]:
"""Return (host, full_url, tenant_id, agent_id) matching the SDK exporter.

Uses the SDK's own URL builder and domain-override handling so the probe
hits the identical URL the exporter would, including the delegated
``/observability`` path and ``api-version=1``.
"""
tenant_id = os.getenv("AGENTIC_TENANT_ID", "")
agent_id = os.getenv("AGENTIC_USER_ID", "")

host = _DEFAULT_HOST
endpoint = f"https://{_DEFAULT_HOST}"
try:
from microsoft_agents_a365.observability.core.exporters.utils import (
build_export_url,
get_validated_domain_override,
)

override = get_validated_domain_override()
if override:
endpoint = override if "://" in override else f"https://{override}"
# use_s2s_endpoint=False mirrors the exporter default (delegated/OBO path).
full_url = build_export_url(endpoint, agent_id, tenant_id, False)
except Exception as e: # pragma: no cover - defensive
logger.warning("Falling back to manual URL build: %s", e)
full_url = (
f"{endpoint}/observability/tenants/{tenant_id}"
f"/otlp/agents/{agent_id}/traces?api-version=1"
)

# Derive the host actually being dialed (honors a domain override).
try:
from urllib.parse import urlparse

host = urlparse(full_url).hostname or _DEFAULT_HOST
except Exception:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
pass

return host, full_url, tenant_id, agent_id


def _probe_dns(host: str) -> dict:
result: dict = {"host": host}
try:
infos = socket.getaddrinfo(host, 443, proto=socket.IPPROTO_TCP)
addrs = sorted({info[4][0] for info in infos})
result["resolved"] = addrs
result["ok"] = True
except Exception as e:
result["ok"] = False
result["error"] = f"{type(e).__name__}: {e}"
return result


def _probe_tls(host: str, *, force_tls12: bool) -> dict:
"""Open a raw TLS connection and report the negotiated parameters.

No HTTP is sent — this isolates the TLS handshake from auth/HTTP so an
``UNEXPECTED_EOF`` here is unambiguously a handshake-level failure.
"""
label = "tls1_2_forced" if force_tls12 else "tls_default"
out: dict = {"variant": label}
start = time.monotonic()
try:
ctx = ssl.create_default_context()
if force_tls12:
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.maximum_version = ssl.TLSVersion.TLSv1_2
with socket.create_connection((host, 443), timeout=_TLS_TIMEOUT_SECONDS) as sock:
with ctx.wrap_socket(sock, server_hostname=host) as tls:

Check failure

Code scanning / CodeQL

Use of insecure SSL/TLS version High

Insecure SSL/TLS protocol version TLSv1 allowed by
call to ssl.create_default_context
.
Insecure SSL/TLS protocol version TLSv1_1 allowed by
call to ssl.create_default_context
.
Comment thread
evanmitchellgithub marked this conversation as resolved.
Fixed
cert = tls.getpeercert() or {}
subject = dict(x[0] for x in cert.get("subject", []))
issuer = dict(x[0] for x in cert.get("issuer", []))
out.update(
ok=True,
negotiated_protocol=tls.version(),
cipher=tls.cipher()[0] if tls.cipher() else None,
alpn=tls.selected_alpn_protocol(),
peer_cert_cn=subject.get("commonName"),
peer_cert_issuer=issuer.get("commonName"),
elapsed_ms=round((time.monotonic() - start) * 1000),
)
except Exception as e:
out.update(
ok=False,
error_type=type(e).__name__,
error=str(e),
elapsed_ms=round((time.monotonic() - start) * 1000),
)
return out


def _minimal_otlp_body(tenant_id: str, agent_id: str) -> str:
"""Smallest valid OTLP/HTTP+JSON body: a single ``invoke_agent`` span.

Mirrors the shape documented in the direct-OTel integration guide so a 200
response would be a genuine accept (subject to the usual downstream drop
conditions). Times are Unix-epoch-nanosecond strings.
"""
import json

now_ns = time.time_ns()
span = {
"traceId": "0af7651916cd43dd8448eb211c80319c",
"spanId": "b7ad6b7169203331",
"name": "invoke_agent probe",
"kind": 1,
"startTimeUnixNano": str(now_ns - 1_000_000),
"endTimeUnixNano": str(now_ns),
"attributes": [
{"key": "gen_ai.operation.name", "value": {"stringValue": "invoke_agent"}},
{"key": "microsoft.tenant.id", "value": {"stringValue": tenant_id}},
{"key": "gen_ai.agent.id", "value": {"stringValue": agent_id}},
],
"status": {"code": 1},
}
body = {
"resourceSpans": [
{
"resource": {"attributes": []},
"scopeSpans": [{"scope": {"name": "debug-probe"}, "spans": [span]}],
}
]
}
return json.dumps(body, separators=(",", ":"))


def _probe_https_post(full_url: str, tenant_id: str, agent_id: str) -> dict:
"""POST a minimal span using the same ``requests`` stack as the exporter."""
out: dict = {"url_path": full_url.split("?")[0]}
body = _minimal_otlp_body(tenant_id, agent_id)
headers = {"content-type": "application/json", "connection": "close"}

token = None
try:
from token_cache import get_cached_agentic_token

token = get_cached_agentic_token(tenant_id, agent_id)
except Exception as e:
out["token_lookup_error"] = f"{type(e).__name__}: {e}"
out["token_attached"] = bool(token)
if token:
headers["authorization"] = f"Bearer {token}"

start = time.monotonic()
try:
resp = requests.post(
full_url,
data=body.encode("utf-8"),
headers=headers,
timeout=_HTTP_TIMEOUT_SECONDS,
)
out.update(
ok=True,
http_status=resp.status_code,
elapsed_ms=round((time.monotonic() - start) * 1000),
correlation_id=(
resp.headers.get("x-ms-correlation-id")
or resp.headers.get("request-id")
or "N/A"
),
# Body is small (partialSuccess JSON or an error page); cap it so we
# never echo anything large. Never contains the bearer token.
response_body=resp.text[:2000],
)
except Exception as e:
out.update(
ok=False,
error_type=type(e).__name__,
error=str(e),
elapsed_ms=round((time.monotonic() - start) * 1000),
traceback=traceback.format_exc()[-1500:],
)
return out


def run_probe() -> dict:
"""Run the full DNS → TLS → HTTPS diagnostic and return a JSON-able dict."""
host, full_url, tenant_id, agent_id = _resolve_endpoint()
summary = {
"target_host": host,
"tenant_id_present": bool(tenant_id),
"agent_id_present": bool(agent_id),
"dns": _probe_dns(host),
"tls_default": _probe_tls(host, force_tls12=False),
"tls_1_2_forced": _probe_tls(host, force_tls12=True),
"https_post": _probe_https_post(full_url, tenant_id, agent_id),
}

# One-line verdict to make the log/JSON instantly readable.
post = summary["https_post"]
if post.get("ok"):
summary["verdict"] = (
f"TLS + HTTP reachable from this egress (HTTP {post.get('http_status')}). "
"Handshake-reset theory DISPROVEN — investigate exporter/auth/transient."
)
elif summary["tls_default"].get("ok"):
summary["verdict"] = (
"Raw TLS handshake succeeded but the HTTPS POST failed — "
"likely HTTP/auth/proxy layer, not a handshake reset."
)
else:
summary["verdict"] = (
"Raw TLS handshake FAILED from this egress — network hypothesis "
"CONFIRMED from the real Cloud Run path. Evidence for allowlist/relay."
)
return summary
Loading
Loading