diff --git a/crates/scaffolder-core/src/telemetry.rs b/crates/scaffolder-core/src/telemetry.rs index e347269fe..bff40b3e1 100644 --- a/crates/scaffolder-core/src/telemetry.rs +++ b/crates/scaffolder-core/src/telemetry.rs @@ -119,7 +119,7 @@ fn build_user_properties(tools_version: &str, device_id: &str) -> serde_json::Va "env": std::env::var("III_ENV").unwrap_or_else(|_| "unknown".to_string()), "install_method": detect_install_method(), "cli_version": tools_version, - "host_user_id": std::env::var("III_HOST_USER_ID").ok(), + "host_device_id": std::env::var("III_HOST_DEVICE_ID").ok(), }) } diff --git a/engine/docker-compose.prod.yml b/engine/docker-compose.prod.yml index b19202874..bf3d96f7a 100644 --- a/engine/docker-compose.prod.yml +++ b/engine/docker-compose.prod.yml @@ -20,6 +20,12 @@ services: - ./config.prod.yaml:/app/config.yaml:ro environment: - III_EXECUTION_CONTEXT=docker + # Pass the host's anonymous telemetry device_id into the container. + # Reported to Amplitude as `host_device_id` so container sessions can + # be attributed back to the host without requiring a mount (which + # breaks on the distroless nonroot uid). Populate from your shell or + # a .env file alongside this compose file. + - III_HOST_DEVICE_ID=${III_HOST_DEVICE_ID:-} healthcheck: test: ["CMD-SHELL", "nc -z 127.0.0.1 3111 || exit 1"] interval: 10s diff --git a/engine/docker-compose.yml b/engine/docker-compose.yml index 5f59c4cbd..cb3d6cca0 100644 --- a/engine/docker-compose.yml +++ b/engine/docker-compose.yml @@ -13,6 +13,13 @@ services: environment: - RUST_LOG=info - III_EXECUTION_CONTEXT=docker + # Pass the host's anonymous telemetry device_id into the container. + # Amplitude reports it as `host_device_id` (not `device_id`) so + # container sessions can be attributed back to the host that + # launched them without requiring a bind-mount (which breaks on the + # distroless nonroot uid). Populate from your shell, e.g.: + # export III_HOST_DEVICE_ID=$(iii telemetry device-id) + - III_HOST_DEVICE_ID=${III_HOST_DEVICE_ID:-} depends_on: redis: condition: service_healthy diff --git a/engine/src/main.rs b/engine/src/main.rs index 27cd78c33..916e8ac78 100644 --- a/engine/src/main.rs +++ b/engine/src/main.rs @@ -114,6 +114,27 @@ enum Commands { #[arg(name = "command")] target: Option, }, + + #[command(hide = true)] + Telemetry(TelemetryArgs), +} + +#[derive(clap::Args, Debug)] +struct TelemetryArgs { + #[command(subcommand)] + command: TelemetryCommands, +} + +#[derive(Subcommand, Debug)] +enum TelemetryCommands { + /// Print the current host device_id used for telemetry. + /// + /// Intended for scripting: `export III_HOST_DEVICE_ID=$(iii telemetry device-id)`. + /// On first invocation this persists a new device_id to ~/.iii/telemetry.yaml + /// (same as a normal engine start). When run inside a container the printed + /// value will be a container-scoped id, so run it on the host. + #[command(hide = true)] + DeviceId, } fn should_init_logging_from_engine_config(cli: &Cli) -> bool { @@ -190,6 +211,15 @@ async fn main() -> anyhow::Result<()> { let exit_code = cli::handle_update(target.as_deref()).await; std::process::exit(exit_code); } + Some(Commands::Telemetry(args)) => { + match args.command { + TelemetryCommands::DeviceId => { + let id = iii::workers::telemetry::environment::get_or_create_device_id(); + println!("{id}"); + } + } + Ok(()) + } None => run_serve(&cli_args).await, } } @@ -197,7 +227,7 @@ async fn main() -> anyhow::Result<()> { #[cfg(test)] mod tests { use super::*; - use clap::Parser; + use clap::{CommandFactory, Parser}; use iii::workers::worker::DEFAULT_PORT; #[test] @@ -471,6 +501,59 @@ mod tests { ); } + #[test] + fn hidden_telemetry_device_id_parses() { + let cli = Cli::try_parse_from(["iii", "telemetry", "device-id"]) + .expect("should parse hidden telemetry device-id subcommand"); + match cli.command { + Some(Commands::Telemetry(args)) => match args.command { + TelemetryCommands::DeviceId => {} + }, + _ => panic!("expected Telemetry(DeviceId) subcommand"), + } + } + + #[test] + fn telemetry_without_subcommand_fails() { + let result = Cli::try_parse_from(["iii", "telemetry"]); + assert!( + result.is_err(), + "'iii telemetry' with no subcommand should fail" + ); + } + + #[test] + fn telemetry_unknown_subcommand_fails() { + let result = Cli::try_parse_from(["iii", "telemetry", "bogus"]); + assert!( + result.is_err(), + "'iii telemetry bogus' should fail — only device-id is valid" + ); + } + + #[test] + fn telemetry_is_hidden_from_top_level_help() { + let mut cmd = Cli::command(); + let top_help = cmd.render_help().to_string(); + assert!( + !top_help.contains("telemetry"), + "top-level help should not mention hidden 'telemetry' subcommand, got:\n{top_help}" + ); + } + + #[test] + fn device_id_is_hidden_from_telemetry_help() { + let mut cmd = Cli::command(); + let telemetry = cmd + .find_subcommand_mut("telemetry") + .expect("telemetry subcommand should exist (hidden)"); + let help = telemetry.render_help().to_string(); + assert!( + !help.contains("device-id"), + "'iii telemetry --help' should not mention hidden 'device-id' subcommand, got:\n{help}" + ); + } + #[test] fn update_iii_cli_target_is_accepted() { // Users with old iii-cli may type "iii update iii-cli" — this must diff --git a/engine/src/workers/telemetry/environment.rs b/engine/src/workers/telemetry/environment.rs index f1188117d..84971beaf 100644 --- a/engine/src/workers/telemetry/environment.rs +++ b/engine/src/workers/telemetry/environment.rs @@ -5,8 +5,6 @@ // See LICENSE and PATENTS files for details. use machineid_rs::{Encryption, HWIDComponent, IdBuilder}; -use sha2::{Digest, Sha256}; - const TELEMETRY_SCHEMA_VERSION: u8 = 2; const DEVICE_ID_SALT: &str = "iii-machine-id"; const EXECUTION_CONTEXT_ENV: &str = "III_EXECUTION_CONTEXT"; @@ -119,19 +117,6 @@ fn is_container_environment() -> bool { ) } -fn container_hostname() -> String { - std::env::var("HOSTNAME") - .ok() - .filter(|h| !h.is_empty()) - .or_else(|| { - std::fs::read_to_string("/etc/hostname") - .ok() - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - }) - .unwrap_or_else(|| "unknown".to_string()) -} - fn find_project_root() -> Option { std::env::var("III_PROJECT_ROOT") .ok() @@ -181,39 +166,63 @@ fn write_device_id_to_project_ini(device_id: &str) { write_atomic(&path, &new_contents); } -fn salted_sha256(input: &str) -> String { - let mut hasher = Sha256::new(); - hasher.update(input); - hasher.update(DEVICE_ID_SALT); - format!("{:x}", hasher.finalize()) +pub fn find_host_device_id() -> Option { + std::env::var("III_HOST_DEVICE_ID") + .ok() + .filter(|s| !s.is_empty()) + .or_else(find_project_ini_device_id) } -fn generate_container_device_id() -> String { - let hostname = container_hostname(); +const UNKNOWN_CONTAINER_DEVICE_ID: &str = "unknown-container"; +const UNKNOWN_HOST_DEVICE_ID: &str = "unknown-host"; +const UNKNOWN_HOSTNAME: &str = "unknown-hostname"; - if let Some(host_device_id) = find_project_ini_device_id() { - return salted_sha256(&format!("{host_device_id}-{hostname}")); - } - - let base = match std::env::var("III_HOST_USER_ID") +fn detect_container_hostname() -> Option { + hostname::get() .ok() + .and_then(|s| s.into_string().ok()) + .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) - { - Some(host_id) => format!("{host_id}-{hostname}"), - None => hostname, +} + +fn container_device_id_for( + host_device_id: Option, + container_hostname: Option, +) -> String { + // Format: `container-{host_id}-{hostname}`. The host_id piece is already a + // salted SHA256 (either from machineid-rs on the host or pasted through + // III_HOST_DEVICE_ID), so it's embedded verbatim — no re-hashing. The + // hostname piece lets us distinguish individual container instances on the + // same host in Amplitude. + // + // When no host identity is available we collapse all such containers onto + // a single `unknown-container` device regardless of hostname — that + // sentinel signals a misconfigured deployment and we don't want it to + // fan out into many devices. When the hostname is unavailable but the + // host_id is, we still emit a well-formed id using `unknown-hostname`. + let Some(host_id) = host_device_id else { + return UNKNOWN_CONTAINER_DEVICE_ID.to_string(); }; + let hostname = container_hostname.unwrap_or_else(|| UNKNOWN_HOSTNAME.to_string()); + format!("container-{host_id}-{hostname}") +} - format!("docker-{}", salted_sha256(&base)) +fn host_device_id_for(machine_id: Option) -> String { + // Same rationale as `container_device_id_for`: when machineid-rs + // can't produce a stable hardware id, collapse to a single + // `unknown-host` bucket instead of generating a fresh UUID. + machine_id.unwrap_or_else(|| UNKNOWN_HOST_DEVICE_ID.to_string()) +} + +fn generate_container_device_id() -> String { + container_device_id_for(find_host_device_id(), detect_container_hostname()) } fn generate_new_device_id() -> String { if is_container_environment() { return generate_container_device_id(); } - if let Some(machine_id) = machine_id_from_machineid_rs() { - return machine_id; - } - format!("fallback-{}", uuid::Uuid::new_v4()) + host_device_id_for(machine_id_from_machineid_rs()) } fn build_fresh_v2_yaml() -> TelemetryYaml { @@ -352,7 +361,7 @@ pub struct EnvironmentInfo { pub cpu_cores: usize, pub os: String, pub arch: String, - pub host_user_id: Option, + pub host_device_id: Option, } impl EnvironmentInfo { @@ -366,10 +375,7 @@ impl EnvironmentInfo { .unwrap_or(1), os: std::env::consts::OS.to_string(), arch: std::env::consts::ARCH.to_string(), - host_user_id: std::env::var("III_HOST_USER_ID") - .ok() - .filter(|s| !s.is_empty()) - .or_else(find_project_ini_device_id), + host_device_id: find_host_device_id(), } } @@ -382,8 +388,8 @@ impl EnvironmentInfo { "os": self.os, "arch": self.arch, }); - if let Some(ref id) = self.host_user_id { - obj["host_user_id"] = serde_json::json!(id); + if let Some(ref id) = self.host_device_id { + obj["host_device_id"] = serde_json::json!(id); } obj } @@ -1011,4 +1017,166 @@ state: let tz = detect_timezone(); assert!(!tz.is_empty(), "timezone should not be empty"); } + + // ========================================================================= + // generate_container_device_id + // ========================================================================= + + #[test] + #[serial] + fn test_container_device_id_is_unknown_when_host_identity_missing() { + let dir = tempfile::tempdir().unwrap(); + unsafe { + env::set_var("III_PROJECT_ROOT", dir.path()); + env::remove_var("III_HOST_DEVICE_ID"); + } + let id = generate_container_device_id(); + assert_eq!(id, "unknown-container"); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + } + } + + #[test] + fn test_container_device_id_for_host_and_hostname() { + assert_eq!( + container_device_id_for(Some("host-abc".to_string()), Some("box-1".to_string())), + "container-host-abc-box-1" + ); + } + + #[test] + fn test_container_device_id_for_missing_hostname_uses_unknown_hostname_suffix() { + assert_eq!( + container_device_id_for(Some("host-abc".to_string()), None), + "container-host-abc-unknown-hostname" + ); + } + + #[test] + fn test_container_device_id_for_missing_host_is_unknown_container_regardless_of_hostname() { + assert_eq!( + container_device_id_for(None, None), + "unknown-container" + ); + assert_eq!( + container_device_id_for(None, Some("box-1".to_string())), + "unknown-container" + ); + } + + #[test] + fn test_host_device_id_for_returns_machine_id_when_present() { + assert_eq!( + host_device_id_for(Some("machine-xyz".to_string())), + "machine-xyz" + ); + } + + #[test] + fn test_host_device_id_for_returns_unknown_host_when_missing() { + assert_eq!(host_device_id_for(None), "unknown-host"); + } + + #[test] + #[serial] + fn test_container_device_id_uses_project_ini_device_id() { + let dir = tempfile::tempdir().unwrap(); + let iii = dir.path().join(".iii"); + std::fs::create_dir_all(&iii).unwrap(); + std::fs::write(iii.join("project.ini"), "device_id=host-dev-abc\n").unwrap(); + unsafe { + env::set_var("III_PROJECT_ROOT", dir.path()); + env::remove_var("III_HOST_DEVICE_ID"); + } + let id = generate_container_device_id(); + assert!( + id.starts_with("container-host-dev-abc-"), + "expected host id literal in prefix, got {id}" + ); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + } + } + + #[test] + #[serial] + fn test_container_device_id_is_stable_per_host_device_id() { + let dir = tempfile::tempdir().unwrap(); + unsafe { + env::set_var("III_PROJECT_ROOT", dir.path()); + env::set_var("III_HOST_DEVICE_ID", "host-dev-xyz"); + } + // Same III_HOST_DEVICE_ID + same hostname (hostname doesn't change + // within one process) → same container device_id across calls. + let a = generate_container_device_id(); + let b = generate_container_device_id(); + assert_eq!(a, b, "same host_device_id must yield same container id"); + + // Different host device_id → different container id. + unsafe { + env::set_var("III_HOST_DEVICE_ID", "host-dev-different"); + } + let c = generate_container_device_id(); + assert_ne!(a, c); + + unsafe { + env::remove_var("III_PROJECT_ROOT"); + env::remove_var("III_HOST_DEVICE_ID"); + } + } + + #[test] + #[serial] + fn test_container_device_id_uses_host_device_id_env() { + let dir = tempfile::tempdir().unwrap(); + unsafe { + env::set_var("III_PROJECT_ROOT", dir.path()); + env::set_var("III_HOST_DEVICE_ID", "host-dev-env"); + } + let id = generate_container_device_id(); + assert!( + id.starts_with("container-host-dev-env-"), + "expected host id literal in prefix, got {id}" + ); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + env::remove_var("III_HOST_DEVICE_ID"); + } + } + + #[test] + #[serial] + fn test_find_host_device_id_env_beats_project_ini() { + let dir = tempfile::tempdir().unwrap(); + let iii = dir.path().join(".iii"); + std::fs::create_dir_all(&iii).unwrap(); + std::fs::write(iii.join("project.ini"), "device_id=from-ini\n").unwrap(); + unsafe { + env::set_var("III_PROJECT_ROOT", dir.path()); + env::set_var("III_HOST_DEVICE_ID", "from-env"); + } + assert_eq!(find_host_device_id().as_deref(), Some("from-env")); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + env::remove_var("III_HOST_DEVICE_ID"); + } + } + + #[test] + #[serial] + fn test_find_host_device_id_falls_back_to_project_ini() { + let dir = tempfile::tempdir().unwrap(); + let iii = dir.path().join(".iii"); + std::fs::create_dir_all(&iii).unwrap(); + std::fs::write(iii.join("project.ini"), "device_id=from-ini\n").unwrap(); + unsafe { + env::set_var("III_PROJECT_ROOT", dir.path()); + env::remove_var("III_HOST_DEVICE_ID"); + } + assert_eq!(find_host_device_id().as_deref(), Some("from-ini")); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + } + } } diff --git a/engine/src/workers/telemetry/mod.rs b/engine/src/workers/telemetry/mod.rs index fd49009e5..40880cbb6 100644 --- a/engine/src/workers/telemetry/mod.rs +++ b/engine/src/workers/telemetry/mod.rs @@ -10,7 +10,7 @@ pub mod environment; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Instant; use async_trait::async_trait; @@ -128,7 +128,10 @@ fn read_project_ini(root: &std::path::Path) -> Option { fn resolve_project_context( sdk_telemetry: Option<&WorkerConnectionTelemetryMeta>, ) -> ProjectContext { - let ini_data = find_project_root().and_then(|root| read_project_ini(&root)); + let project_root = find_project_root(); + let ini_data = project_root + .as_ref() + .and_then(|root| read_project_ini(root)); let project_id = ini_data .as_ref() @@ -142,7 +145,9 @@ fn resolve_project_context( let project_name = ini_data .as_ref() .and_then(|d| d.project_name.clone()) - .or_else(|| sdk_telemetry.and_then(|t| t.project_name.clone())); + .or_else(|| sdk_telemetry.and_then(|t| t.project_name.clone())) + .or_else(|| directory_basename(project_root.as_deref())) + .or_else(|| directory_basename(std::env::current_dir().ok().as_deref())); let source = ini_data.as_ref().and_then(|d| d.source.clone()); @@ -153,6 +158,14 @@ fn resolve_project_context( } } +fn directory_basename(path: Option<&std::path::Path>) -> Option { + path? + .file_name() + .and_then(|n| n.to_str()) + .map(|s| s.to_string()) + .filter(|s| !s.is_empty()) +} + fn get_or_create_device_id() -> String { environment::get_or_create_device_id() } @@ -273,123 +286,309 @@ fn build_base_properties(snap: &EngineSnapshot) -> serde_json::Map Self { -// Self { -// invocations_total: 0, -// invocations_success: 0, -// invocations_error: 0, -// api_requests: 0, -// queue_emits: 0, -// queue_consumes: 0, -// pubsub_publishes: 0, -// pubsub_subscribes: 0, -// cron_executions: 0, -// } -// } -// -// fn snapshot(&mut self) -> DeltaSnapshot { -// use std::sync::atomic::Ordering; -// let acc = crate::workers::observability::metrics::get_metrics_accumulator(); -// let col = collector(); -// -// let cur = DeltaAccumulator { -// invocations_total: acc.invocations_total.load(Ordering::Relaxed), -// invocations_success: acc.invocations_success.load(Ordering::Relaxed), -// invocations_error: acc.invocations_error.load(Ordering::Relaxed), -// api_requests: col.api_requests.load(Ordering::Relaxed), -// queue_emits: col.queue_emits.load(Ordering::Relaxed), -// queue_consumes: col.queue_consumes.load(Ordering::Relaxed), -// pubsub_publishes: col.pubsub_publishes.load(Ordering::Relaxed), -// pubsub_subscribes: col.pubsub_subscribes.load(Ordering::Relaxed), -// cron_executions: col.cron_executions.load(Ordering::Relaxed), -// }; -// -// let deltas = DeltaSnapshot { -// invocations_total: cur.invocations_total.saturating_sub(self.invocations_total), -// invocations_success: cur -// .invocations_success -// .saturating_sub(self.invocations_success), -// invocations_error: cur.invocations_error.saturating_sub(self.invocations_error), -// api_requests: cur.api_requests.saturating_sub(self.api_requests), -// queue_emits: cur.queue_emits.saturating_sub(self.queue_emits), -// queue_consumes: cur.queue_consumes.saturating_sub(self.queue_consumes), -// pubsub_publishes: cur.pubsub_publishes.saturating_sub(self.pubsub_publishes), -// pubsub_subscribes: cur.pubsub_subscribes.saturating_sub(self.pubsub_subscribes), -// cron_executions: cur.cron_executions.saturating_sub(self.cron_executions), -// }; -// -// *self = cur; -// deltas -// } -// } -// -// struct DeltaSnapshot { -// invocations_total: u64, -// invocations_success: u64, -// invocations_error: u64, -// api_requests: u64, -// queue_emits: u64, -// queue_consumes: u64, -// pubsub_publishes: u64, -// pubsub_subscribes: u64, -// cron_executions: u64, -// } -// -// impl DeltaSnapshot { -// fn insert_into(&self, m: &mut serde_json::Map) { -// m.insert( -// "delta_invocations_total".into(), -// serde_json::json!(self.invocations_total), -// ); -// m.insert( -// "delta_invocations_success".into(), -// serde_json::json!(self.invocations_success), -// ); -// m.insert( -// "delta_invocations_error".into(), -// serde_json::json!(self.invocations_error), -// ); -// m.insert( -// "delta_api_requests".into(), -// serde_json::json!(self.api_requests), -// ); -// m.insert( -// "delta_queue_emits".into(), -// serde_json::json!(self.queue_emits), -// ); -// m.insert( -// "delta_queue_consumes".into(), -// serde_json::json!(self.queue_consumes), -// ); -// m.insert( -// "delta_pubsub_publishes".into(), -// serde_json::json!(self.pubsub_publishes), -// ); -// m.insert( -// "delta_pubsub_subscribes".into(), -// serde_json::json!(self.pubsub_subscribes), -// ); -// m.insert( -// "delta_cron_executions".into(), -// serde_json::json!(self.cron_executions), -// ); -// } -// } +/// Holds the previous snapshot of cumulative counters and current-state values so +/// each heartbeat can emit deltas. The state fields (function/trigger/worker counts +/// and ID sets) can decrease, so we track them explicitly instead of only relying on +/// the monotonically-increasing registration counters. +struct DeltaAccumulator { + initialized: bool, + + // Cumulative counters from the metrics accumulator / collector. Only grow. + invocations_total: u64, + invocations_success: u64, + invocations_error: u64, + api_requests: u64, + queue_emits: u64, + queue_consumes: u64, + pubsub_publishes: u64, + pubsub_subscribes: u64, + cron_executions: u64, + function_registrations: u64, + trigger_registrations: u64, + + // Current-state values. Can go up or down between heartbeats. + function_count: usize, + trigger_count: usize, + worker_count: usize, + function_ids: HashSet, + worker_ids: HashSet, +} + +impl DeltaAccumulator { + fn new() -> Self { + Self { + initialized: false, + invocations_total: 0, + invocations_success: 0, + invocations_error: 0, + api_requests: 0, + queue_emits: 0, + queue_consumes: 0, + pubsub_publishes: 0, + pubsub_subscribes: 0, + cron_executions: 0, + function_registrations: 0, + trigger_registrations: 0, + function_count: 0, + trigger_count: 0, + worker_count: 0, + function_ids: HashSet::new(), + worker_ids: HashSet::new(), + } + } + + fn snapshot(&mut self, snap: &EngineSnapshot) -> DeltaSnapshot { + use std::sync::atomic::Ordering; + let acc = crate::workers::observability::metrics::get_metrics_accumulator(); + let col = collector::collector(); + + let cur_invocations_total = acc.invocations_total.load(Ordering::Relaxed); + let cur_invocations_success = acc.invocations_success.load(Ordering::Relaxed); + let cur_invocations_error = acc.invocations_error.load(Ordering::Relaxed); + let cur_api_requests = col.api_requests.load(Ordering::Relaxed); + let cur_queue_emits = col.queue_emits.load(Ordering::Relaxed); + let cur_queue_consumes = col.queue_consumes.load(Ordering::Relaxed); + let cur_pubsub_publishes = col.pubsub_publishes.load(Ordering::Relaxed); + let cur_pubsub_subscribes = col.pubsub_subscribes.load(Ordering::Relaxed); + let cur_cron_executions = col.cron_executions.load(Ordering::Relaxed); + let cur_function_registrations = col.function_registrations.load(Ordering::Relaxed); + let cur_trigger_registrations = col.trigger_registrations.load(Ordering::Relaxed); + + let cur_function_count = snap.ft.function_count; + let cur_trigger_count = snap.ft.trigger_count; + let cur_worker_count = snap.wd.worker_count_total; + let cur_function_ids: HashSet = snap.ft.functions.iter().cloned().collect(); + let cur_worker_ids: HashSet = snap.wd.workers.iter().cloned().collect(); + + // On the very first snapshot there is no prior state to diff against. Emit + // cumulative deltas (the counters reflect activity since process start) but + // keep the state-change deltas at zero so session start does not masquerade + // as a development event in Amplitude cohorts. + let first_snapshot = !self.initialized; + let delta = if first_snapshot { + DeltaSnapshot { + first_snapshot: true, + invocations_total: cur_invocations_total, + invocations_success: cur_invocations_success, + invocations_error: cur_invocations_error, + api_requests: cur_api_requests, + queue_emits: cur_queue_emits, + queue_consumes: cur_queue_consumes, + pubsub_publishes: cur_pubsub_publishes, + pubsub_subscribes: cur_pubsub_subscribes, + cron_executions: cur_cron_executions, + function_registrations: cur_function_registrations, + trigger_registrations: cur_trigger_registrations, + delta_function_count: 0, + delta_trigger_count: 0, + delta_worker_count: 0, + functions_added: Vec::new(), + functions_removed: Vec::new(), + workers_added: Vec::new(), + workers_removed: Vec::new(), + } + } else { + let functions_added: Vec = cur_function_ids + .difference(&self.function_ids) + .cloned() + .collect(); + let functions_removed: Vec = self + .function_ids + .difference(&cur_function_ids) + .cloned() + .collect(); + let workers_added: Vec = cur_worker_ids + .difference(&self.worker_ids) + .cloned() + .collect(); + let workers_removed: Vec = self + .worker_ids + .difference(&cur_worker_ids) + .cloned() + .collect(); + + DeltaSnapshot { + first_snapshot: false, + invocations_total: cur_invocations_total.saturating_sub(self.invocations_total), + invocations_success: cur_invocations_success + .saturating_sub(self.invocations_success), + invocations_error: cur_invocations_error.saturating_sub(self.invocations_error), + api_requests: cur_api_requests.saturating_sub(self.api_requests), + queue_emits: cur_queue_emits.saturating_sub(self.queue_emits), + queue_consumes: cur_queue_consumes.saturating_sub(self.queue_consumes), + pubsub_publishes: cur_pubsub_publishes.saturating_sub(self.pubsub_publishes), + pubsub_subscribes: cur_pubsub_subscribes.saturating_sub(self.pubsub_subscribes), + cron_executions: cur_cron_executions.saturating_sub(self.cron_executions), + function_registrations: cur_function_registrations + .saturating_sub(self.function_registrations), + trigger_registrations: cur_trigger_registrations + .saturating_sub(self.trigger_registrations), + delta_function_count: (cur_function_count as i64) - (self.function_count as i64), + delta_trigger_count: (cur_trigger_count as i64) - (self.trigger_count as i64), + delta_worker_count: (cur_worker_count as i64) - (self.worker_count as i64), + functions_added, + functions_removed, + workers_added, + workers_removed, + } + }; + + self.invocations_total = cur_invocations_total; + self.invocations_success = cur_invocations_success; + self.invocations_error = cur_invocations_error; + self.api_requests = cur_api_requests; + self.queue_emits = cur_queue_emits; + self.queue_consumes = cur_queue_consumes; + self.pubsub_publishes = cur_pubsub_publishes; + self.pubsub_subscribes = cur_pubsub_subscribes; + self.cron_executions = cur_cron_executions; + self.function_registrations = cur_function_registrations; + self.trigger_registrations = cur_trigger_registrations; + self.function_count = cur_function_count; + self.trigger_count = cur_trigger_count; + self.worker_count = cur_worker_count; + self.function_ids = cur_function_ids; + self.worker_ids = cur_worker_ids; + self.initialized = true; + + delta + } +} + +struct DeltaSnapshot { + // True when this snapshot is the first one taken after process start. + // `is_active_developer` returns false in this case so the session_start + // heartbeat never masquerades as a development event — even when the + // accumulator has already observed non-zero function_registrations + // between boot and the first snapshot call. + first_snapshot: bool, + invocations_total: u64, + invocations_success: u64, + invocations_error: u64, + api_requests: u64, + queue_emits: u64, + queue_consumes: u64, + pubsub_publishes: u64, + pubsub_subscribes: u64, + cron_executions: u64, + function_registrations: u64, + trigger_registrations: u64, + delta_function_count: i64, + delta_trigger_count: i64, + delta_worker_count: i64, + functions_added: Vec, + functions_removed: Vec, + workers_added: Vec, + workers_removed: Vec, +} + +impl DeltaSnapshot { + /// `is_active` captures runtime activity in the period (any invocations). + /// Use this to separate idle installs from workloads actually serving traffic. + fn is_active(&self) -> bool { + self.invocations_total > 0 + } + + /// `is_active_developer` captures development activity in the period: the + /// registered function set changed (add/remove) or a function was + /// re-registered (hot reload / code edit cycle). Intentionally excludes + /// pure worker churn so SDK reconnects without code changes don't flip it. + /// + /// The first snapshot after process start always returns false: the + /// state-change deltas are zeroed (nothing to diff against) but the + /// cumulative `function_registrations` counter reflects boot-time + /// registrations, which would otherwise masquerade as code edits. + fn is_active_developer(&self) -> bool { + if self.first_snapshot { + return false; + } + !self.functions_added.is_empty() + || !self.functions_removed.is_empty() + || self.function_registrations > 0 + || self.delta_function_count != 0 + || self.delta_trigger_count != 0 + } + + fn insert_into(&self, m: &mut serde_json::Map) { + m.insert( + "delta_invocations_total".into(), + serde_json::json!(self.invocations_total), + ); + m.insert( + "delta_invocations_success".into(), + serde_json::json!(self.invocations_success), + ); + m.insert( + "delta_invocations_error".into(), + serde_json::json!(self.invocations_error), + ); + m.insert( + "delta_api_requests".into(), + serde_json::json!(self.api_requests), + ); + m.insert( + "delta_queue_emits".into(), + serde_json::json!(self.queue_emits), + ); + m.insert( + "delta_queue_consumes".into(), + serde_json::json!(self.queue_consumes), + ); + m.insert( + "delta_pubsub_publishes".into(), + serde_json::json!(self.pubsub_publishes), + ); + m.insert( + "delta_pubsub_subscribes".into(), + serde_json::json!(self.pubsub_subscribes), + ); + m.insert( + "delta_cron_executions".into(), + serde_json::json!(self.cron_executions), + ); + m.insert( + "delta_function_registrations".into(), + serde_json::json!(self.function_registrations), + ); + m.insert( + "delta_trigger_registrations".into(), + serde_json::json!(self.trigger_registrations), + ); + m.insert( + "delta_function_count".into(), + serde_json::json!(self.delta_function_count), + ); + m.insert( + "delta_trigger_count".into(), + serde_json::json!(self.delta_trigger_count), + ); + m.insert( + "delta_worker_count".into(), + serde_json::json!(self.delta_worker_count), + ); + m.insert( + "functions_added".into(), + serde_json::json!(self.functions_added), + ); + m.insert( + "functions_removed".into(), + serde_json::json!(self.functions_removed), + ); + m.insert( + "workers_added".into(), + serde_json::json!(self.workers_added), + ); + m.insert( + "workers_removed".into(), + serde_json::json!(self.workers_removed), + ); + m.insert("is_active".into(), serde_json::json!(self.is_active())); + m.insert( + "is_active_developer".into(), + serde_json::json!(self.is_active_developer()), + ); + } +} fn collect_functions_and_triggers(engine: &Engine) -> FunctionTriggerData { let functions: Vec = engine @@ -525,13 +724,8 @@ impl TelemetryContext { "iii_version": env!("CARGO_PKG_VERSION"), }); - let host_user_id = std::env::var("III_HOST_USER_ID") - .ok() - .filter(|s| !s.is_empty()) - .or_else(environment::find_project_ini_device_id); - - if let Some(id) = host_user_id { - props["host_user_id"] = serde_json::Value::String(id); + if let Some(id) = environment::find_host_device_id() { + props["host_device_id"] = serde_json::Value::String(id); } if let Some(project_id) = project.project_id { @@ -599,6 +793,7 @@ pub struct TelemetryWorker { sdk_client: Option>, ctx: TelemetryContext, start_time: Instant, + delta: Arc>, } impl TelemetryWorker { @@ -694,6 +889,7 @@ impl Worker for TelemetryWorker { sdk_client, ctx, start_time: Instant::now(), + delta: Arc::new(Mutex::new(DeltaAccumulator::new())), })) } @@ -711,10 +907,12 @@ impl Worker for TelemetryWorker { let engine = Arc::clone(&self.engine); let ctx = self.ctx.clone(); let start_time = self.start_time; + let delta = Arc::clone(&self.delta); let engine_for_started = Arc::clone(&self.engine); let client_for_started = Arc::clone(self.active_client()); let ctx_for_started = self.ctx.clone(); + let delta_for_started = Arc::clone(&self.delta); tokio::spawn(async move { let user_invocation = collector::first_user_invocation_notify().notified(); tokio::select! { @@ -749,10 +947,12 @@ impl Worker for TelemetryWorker { "uptime_secs".into(), serde_json::json!(start_time.elapsed().as_secs()), ); - // TODO: Re-enable delta metrics once more important dashboards are ready. - // let d = DeltaAccumulator::new().snapshot(); - // props.insert("is_active".into(), serde_json::json!(d.invocations_total > 0)); - // d.insert_into(&mut props); + + let d = delta_for_started + .lock() + .expect("delta accumulator mutex poisoned") + .snapshot(&snap); + d.insert_into(&mut props); let boot_heartbeat = ctx_for_started.build_event( "heartbeat", @@ -768,9 +968,6 @@ impl Worker for TelemetryWorker { interval.tick().await; - // TODO: Re-enable delta metrics once downstream dashboards are ready. - // let mut deltas = DeltaAccumulator::new(); - loop { tokio::select! { result = shutdown_rx.changed() => { @@ -781,6 +978,12 @@ impl Worker for TelemetryWorker { let mut props = build_base_properties(&snap); props.insert("uptime_secs".into(), serde_json::json!(start_time.elapsed().as_secs())); + let d = delta + .lock() + .expect("delta accumulator mutex poisoned") + .snapshot(&snap); + d.insert_into(&mut props); + let event = ctx.build_event( "engine_stopped", serde_json::Value::Object(props), @@ -797,7 +1000,6 @@ impl Worker for TelemetryWorker { } } _ = interval.tick() => { - // let d = deltas.snapshot(); let snap = collect_engine_snapshot(&engine); let mut props = build_base_properties(&snap); @@ -805,8 +1007,12 @@ impl Worker for TelemetryWorker { props.insert("worker_count_by_language".into(), serde_json::json!(snap.wd.worker_count_by_language)); props.insert("period_secs".into(), serde_json::json!(interval_secs)); props.insert("uptime_secs".into(), serde_json::json!(start_time.elapsed().as_secs())); - // props.insert("is_active".into(), serde_json::json!(d.invocations_total > 0)); - // d.insert_into(&mut props); + + let d = delta + .lock() + .expect("delta accumulator mutex poisoned") + .snapshot(&snap); + d.insert_into(&mut props); let event = ctx.build_event( "heartbeat", @@ -1001,7 +1207,7 @@ mod tests { cpu_cores: 4, os: "linux".to_string(), arch: "x86_64".to_string(), - host_user_id: None, + host_device_id: None, } } @@ -1024,6 +1230,7 @@ mod tests { env_info: make_env_info(), }, start_time: Instant::now(), + delta: Arc::new(Mutex::new(DeltaAccumulator::new())), } } @@ -1117,14 +1324,92 @@ mod tests { #[test] #[serial] - fn test_resolve_project_context_none_when_unset() { + fn test_resolve_project_context_project_id_none_when_unset() { unsafe { env::remove_var("III_PROJECT_ID"); env::remove_var("III_PROJECT_ROOT"); } let ctx = resolve_project_context(None); assert_eq!(ctx.project_id, None); - assert_eq!(ctx.project_name, None); + // project_name falls back to the cwd basename when no project.ini or + // SDK metadata is available — tested separately below. + } + + #[test] + #[serial] + fn test_resolve_project_context_falls_back_to_project_root_basename() { + let dir = tempfile::Builder::new() + .prefix("my-demo-project-") + .tempdir() + .unwrap(); + unsafe { + env::remove_var("III_PROJECT_ID"); + env::set_var("III_PROJECT_ROOT", dir.path()); + } + let ctx = resolve_project_context(None); + let expected = dir.path().file_name().unwrap().to_str().unwrap(); + assert_eq!(ctx.project_name.as_deref(), Some(expected)); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + } + } + + #[test] + #[serial] + fn test_resolve_project_context_ini_beats_basename() { + let dir = tempfile::Builder::new() + .prefix("some-other-dir-") + .tempdir() + .unwrap(); + let iii_dir = dir.path().join(".iii"); + std::fs::create_dir_all(&iii_dir).unwrap(); + std::fs::write(iii_dir.join("project.ini"), "project_name=from-ini\n").unwrap(); + unsafe { + env::remove_var("III_PROJECT_ID"); + env::set_var("III_PROJECT_ROOT", dir.path()); + } + let ctx = resolve_project_context(None); + assert_eq!(ctx.project_name, Some("from-ini".to_string())); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + } + } + + #[test] + #[serial] + fn test_resolve_project_context_sdk_beats_basename() { + let dir = tempfile::Builder::new() + .prefix("fallback-dir-") + .tempdir() + .unwrap(); + unsafe { + env::remove_var("III_PROJECT_ID"); + env::set_var("III_PROJECT_ROOT", dir.path()); + } + let telemetry = WorkerConnectionTelemetryMeta { + language: None, + project_name: Some("sdk-name".to_string()), + framework: None, + }; + let ctx = resolve_project_context(Some(&telemetry)); + assert_eq!(ctx.project_name, Some("sdk-name".to_string())); + unsafe { + env::remove_var("III_PROJECT_ROOT"); + } + } + + #[test] + fn test_directory_basename_returns_name() { + let path = std::path::PathBuf::from("/foo/bar/my-project"); + assert_eq!( + directory_basename(Some(&path)), + Some("my-project".to_string()) + ); + } + + #[test] + fn test_directory_basename_none_for_empty() { + assert_eq!(directory_basename(None), None); } // ========================================================================= @@ -1379,7 +1664,7 @@ mod tests { cpu_cores: 4, os: "linux".to_string(), arch: "x86_64".to_string(), - host_user_id: None, + host_device_id: None, }, }; @@ -1480,7 +1765,7 @@ mod tests { cpu_cores: 4, os: "linux".to_string(), arch: "x86_64".to_string(), - host_user_id: None, + host_device_id: None, }, }; @@ -1508,7 +1793,7 @@ mod tests { cpu_cores: 2, os: "macos".to_string(), arch: "aarch64".to_string(), - host_user_id: None, + host_device_id: None, }, }; @@ -1635,7 +1920,7 @@ mod tests { cpu_cores: 16, os: "linux".to_string(), arch: "x86_64".to_string(), - host_user_id: None, + host_device_id: None, }, }; @@ -2408,4 +2693,263 @@ mod tests { assert_eq!(TEMPLATE_POLL_INTERVAL_SECS, 3); assert_eq!(TEMPLATE_POLL_TIMEOUT_SECS, 60 * 60); } + + // ========================================================================= + // DeltaAccumulator / DeltaSnapshot + // ========================================================================= + + fn make_engine_snapshot( + functions: Vec<&str>, + triggers: Vec<(&str, &str)>, + workers: Vec<&str>, + ) -> EngineSnapshot { + let trigger_types: Vec = triggers.iter().map(|(t, _)| t.to_string()).collect(); + EngineSnapshot { + ft: FunctionTriggerData { + function_count: functions.len(), + functions: functions.iter().map(|s| s.to_string()).collect(), + trigger_count: triggers.len(), + trigger_types, + }, + wd: WorkerData { + worker_count_total: workers.len(), + worker_count_by_framework: HashMap::new(), + worker_count_by_language: HashMap::new(), + workers: workers.iter().map(|s| s.to_string()).collect(), + sdk_languages: vec![], + client_type: "iii_direct".to_string(), + sdk_telemetry: None, + }, + project: ProjectContext { + project_id: None, + project_name: None, + source: None, + }, + } + } + + #[test] + #[serial] + fn test_delta_accumulator_first_snapshot_zeros_state_deltas() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap = make_engine_snapshot( + vec!["fn::a", "fn::b"], + vec![("http", "fn::a")], + vec!["node"], + ); + + let d = acc.snapshot(&snap); + + // First snapshot: state-change deltas must be zero so session start is + // not mistaken for active development. + assert!(d.first_snapshot); + assert_eq!(d.delta_function_count, 0); + assert_eq!(d.delta_trigger_count, 0); + assert_eq!(d.delta_worker_count, 0); + assert!(d.functions_added.is_empty()); + assert!(d.functions_removed.is_empty()); + assert!(d.workers_added.is_empty()); + assert!(d.workers_removed.is_empty()); + assert!(!d.is_active_developer()); + } + + #[test] + #[serial] + fn test_delta_accumulator_first_snapshot_not_active_dev_even_with_boot_registrations() { + // CodeRabbit scenario: workers register functions during startup BEFORE + // the first telemetry snapshot fires. The cumulative + // function_registrations counter is non-zero by the time snapshot() + // runs. is_active_developer must still return false on that first + // snapshot — otherwise the session_start heartbeat masquerades as a + // code-edit cycle. + reset_telemetry_globals(); + collector::track_function_registered(); + collector::track_function_registered(); + collector::track_function_registered(); + + let mut acc = DeltaAccumulator::new(); + let snap = make_engine_snapshot(vec!["fn::a", "fn::b", "fn::c"], vec![], vec!["node"]); + let d = acc.snapshot(&snap); + + // Cumulative counter is carried through (reported as-is on first + // snapshot since there's no prior value to diff against). + assert_eq!(d.function_registrations, 3); + // ...but the session-start heartbeat must not be flagged as active + // development regardless. + assert!(d.first_snapshot); + assert!( + !d.is_active_developer(), + "first snapshot must gate is_active_developer to false even when registrations > 0" + ); + + // Subsequent snapshot with no new activity should also stay false. + let snap2 = make_engine_snapshot(vec!["fn::a", "fn::b", "fn::c"], vec![], vec!["node"]); + let d2 = acc.snapshot(&snap2); + assert!(!d2.first_snapshot); + assert_eq!(d2.function_registrations, 0); + assert!(!d2.is_active_developer()); + + // A new registration in the next window does flip the flag. + collector::track_function_registered(); + let snap3 = make_engine_snapshot(vec!["fn::a", "fn::b", "fn::c"], vec![], vec!["node"]); + let d3 = acc.snapshot(&snap3); + assert_eq!(d3.function_registrations, 1); + assert!(d3.is_active_developer()); + } + + #[test] + #[serial] + fn test_delta_accumulator_reports_added_functions() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap1 = make_engine_snapshot(vec!["fn::a"], vec![], vec![]); + acc.snapshot(&snap1); + + let snap2 = make_engine_snapshot(vec!["fn::a", "fn::b", "fn::c"], vec![], vec![]); + let d = acc.snapshot(&snap2); + + assert_eq!(d.delta_function_count, 2); + let mut added = d.functions_added.clone(); + added.sort(); + assert_eq!(added, vec!["fn::b".to_string(), "fn::c".to_string()]); + assert!(d.functions_removed.is_empty()); + assert!(d.is_active_developer()); + } + + #[test] + #[serial] + fn test_delta_accumulator_reports_removed_functions() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap1 = make_engine_snapshot(vec!["fn::a", "fn::b"], vec![], vec![]); + acc.snapshot(&snap1); + + let snap2 = make_engine_snapshot(vec!["fn::a"], vec![], vec![]); + let d = acc.snapshot(&snap2); + + assert_eq!(d.delta_function_count, -1); + assert_eq!(d.functions_removed, vec!["fn::b".to_string()]); + assert!(d.functions_added.is_empty()); + assert!(d.is_active_developer()); + } + + #[test] + #[serial] + fn test_delta_accumulator_stable_function_set_not_active_dev() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap1 = make_engine_snapshot(vec!["fn::a"], vec![], vec!["node"]); + acc.snapshot(&snap1); + + // Exact same engine state, no registrations happened. + let snap2 = make_engine_snapshot(vec!["fn::a"], vec![], vec!["node"]); + let d = acc.snapshot(&snap2); + + assert_eq!(d.delta_function_count, 0); + assert_eq!(d.delta_trigger_count, 0); + assert_eq!(d.delta_worker_count, 0); + assert_eq!(d.function_registrations, 0); + assert!(!d.is_active_developer()); + } + + #[test] + #[serial] + fn test_delta_accumulator_reregistration_flags_active_dev() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap1 = make_engine_snapshot(vec!["fn::a"], vec![], vec!["node"]); + acc.snapshot(&snap1); + + // Same function set, but the function_registrations counter bumped — + // a typical hot-reload / code-edit cycle signal. + collector::track_function_registered(); + let snap2 = make_engine_snapshot(vec!["fn::a"], vec![], vec!["node"]); + let d = acc.snapshot(&snap2); + + assert_eq!(d.delta_function_count, 0); + assert_eq!(d.function_registrations, 1); + assert!(d.is_active_developer()); + } + + #[test] + #[serial] + fn test_delta_accumulator_is_active_requires_invocations() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap1 = make_engine_snapshot(vec!["fn::a"], vec![], vec![]); + acc.snapshot(&snap1); + + // No invocations between snapshots: not active. + let snap2 = make_engine_snapshot(vec!["fn::a"], vec![], vec![]); + let d = acc.snapshot(&snap2); + assert!(!d.is_active()); + + get_metrics_accumulator() + .invocations_total + .fetch_add(3, Ordering::Relaxed); + let snap3 = make_engine_snapshot(vec!["fn::a"], vec![], vec![]); + let d2 = acc.snapshot(&snap3); + assert_eq!(d2.invocations_total, 3); + assert!(d2.is_active()); + } + + #[test] + #[serial] + fn test_delta_accumulator_worker_churn_does_not_flag_active_dev() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap1 = make_engine_snapshot(vec!["fn::a"], vec![], vec!["node"]); + acc.snapshot(&snap1); + + // SDK reconnects: worker set changed but function set is stable and no + // new registrations happened (e.g., engine reused the function entry). + let snap2 = make_engine_snapshot(vec!["fn::a"], vec![], vec!["python"]); + let d = acc.snapshot(&snap2); + + assert_eq!(d.delta_worker_count, 0); + assert_eq!(d.workers_added, vec!["python".to_string()]); + assert_eq!(d.workers_removed, vec!["node".to_string()]); + assert!( + !d.is_active_developer(), + "worker churn alone must not mark the session as active development" + ); + } + + #[test] + #[serial] + fn test_delta_snapshot_insert_into_emits_expected_keys() { + reset_telemetry_globals(); + let mut acc = DeltaAccumulator::new(); + let snap = make_engine_snapshot(vec!["fn::a"], vec![], vec![]); + let d = acc.snapshot(&snap); + + let mut m = serde_json::Map::new(); + d.insert_into(&mut m); + + for key in &[ + "delta_invocations_total", + "delta_invocations_success", + "delta_invocations_error", + "delta_api_requests", + "delta_queue_emits", + "delta_queue_consumes", + "delta_pubsub_publishes", + "delta_pubsub_subscribes", + "delta_cron_executions", + "delta_function_registrations", + "delta_trigger_registrations", + "delta_function_count", + "delta_trigger_count", + "delta_worker_count", + "functions_added", + "functions_removed", + "workers_added", + "workers_removed", + "is_active", + "is_active_developer", + ] { + assert!(m.contains_key(*key), "missing key: {key}"); + } + } }