Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 351 additions & 2 deletions crates/octos-cli/src/api/admin.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
//! Admin API handlers for profile and gateway management.

use std::sync::Arc;
use std::path::{Path as FsPath, PathBuf};
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::response::sse::{Event, KeepAlive, Sse};
use chrono::Utc;
use chrono::{DateTime, Local, Utc};
use futures::StreamExt;
use regex::{Regex, RegexBuilder};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncSeekExt};

use super::AppState;
use crate::profiles::{ProfileConfig, UserProfile, mask_secrets};

const DEFAULT_SERVE_LOG_TAIL_N: usize = 200;
const MAX_SERVE_LOG_TAIL_N: usize = 5_000;
const SERVE_LOG_POLL_INTERVAL: Duration = Duration::from_secs(1);

static SERVE_LOG_BEARER_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(?i)\bBearer\s+[A-Za-z0-9_.+/=-]{12,}").unwrap());
static SERVE_LOG_QUERY_TOKEN_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(?i)((?:[?&]|\b)(?:token|auth_token)=)[^&\s]+").unwrap());
static SERVE_LOG_API_KEY_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"\b(?:sk-[A-Za-z0-9_-]{12,}|ghp_[A-Za-z0-9_]{12,}|github_pat_[A-Za-z0-9_]+|glpat-[A-Za-z0-9_-]{12,}|AKIA[0-9A-Z]{16})\b",
)
.unwrap()
});

/// Basic email format validation.
pub(crate) fn validate_email(email: &str) -> Result<(), String> {
if email.len() > 254 {
Expand Down Expand Up @@ -648,6 +666,253 @@ pub async fn gateway_logs(
Ok(Sse::new(history_stream.chain(live_stream)).keep_alive(KeepAlive::default()))
}

#[derive(Debug, Clone, Deserialize)]
pub struct ServeLogsQuery {
#[serde(default)]
tail_n: Option<usize>,
#[serde(default)]
grep: Option<String>,
#[serde(default)]
since: Option<DateTime<Utc>>,
}

#[derive(Debug, Clone)]
struct ServeLogFilter {
grep: Option<Regex>,
since: Option<DateTime<Utc>>,
}

impl ServeLogFilter {
fn from_query(query: &ServeLogsQuery) -> Result<Self, String> {
let grep = match query
.grep
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
{
Some(pattern) => {
let anchored = format!("^(?:{pattern})$");
Some(
RegexBuilder::new(&anchored)
.case_insensitive(true)
.build()
.map_err(|error| format!("invalid grep regex: {error}"))?,
)
}
None => None,
};
Ok(Self {
grep,
since: query.since,
})
}
}

/// GET /api/admin/serve/logs — SSE stream of the main daemon log.
pub async fn serve_logs(
State(state): State<Arc<AppState>>,
Query(query): Query<ServeLogsQuery>,
) -> Result<
Sse<impl futures::Stream<Item = Result<Event, std::convert::Infallible>>>,
(StatusCode, String),
> {
let store = state.profile_store.as_ref().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"admin not configured".into(),
))?;
let filter = ServeLogFilter::from_query(&query).map_err(|error| {
(
StatusCode::BAD_REQUEST,
format!("invalid serve log query: {error}"),
)
})?;
let tail_n = query
.tail_n
.unwrap_or(DEFAULT_SERVE_LOG_TAIL_N)
.min(MAX_SERVE_LOG_TAIL_N);
let octos_home = store.octos_home_dir().to_path_buf();
let log_path = serve_log_path_for_now(&octos_home);

let replay = read_serve_log_replay(&log_path, tail_n, &filter)
.await
.map_err(|error| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to read serve log: {error}"),
)
})?;
let initial_offset = tokio::fs::metadata(&log_path)
.await
.map(|metadata| metadata.len())
.unwrap_or(0);
let history_stream = futures::stream::iter(
replay
.into_iter()
.map(|line| Ok(Event::default().data(line))),
);
let live_stream = futures::stream::unfold(
ServeLogTailState {
octos_home,
path: log_path,
offset: initial_offset,
pending: String::new(),
filter,
},
|mut state| async move {
loop {
tokio::time::sleep(SERVE_LOG_POLL_INTERVAL).await;
state.refresh_path_for_rotation();
match read_new_serve_log_lines(&mut state).await {
Ok(lines) if lines.is_empty() => continue,
Ok(lines) => {
let events = futures::stream::iter(
lines
.into_iter()
.map(|line| Ok(Event::default().data(line)))
.collect::<Vec<Result<Event, std::convert::Infallible>>>(),
);
return Some((events, state));
}
Err(error) => {
let event: Result<Event, std::convert::Infallible> = Ok(Event::default()
.event("error")
.data(format!("failed to read serve log: {error}")));
let events = futures::stream::iter(vec![event]);
return Some((events, state));
}
}
}
},
)
.flatten();

Ok(Sse::new(history_stream.chain(live_stream)).keep_alive(KeepAlive::default()))
}

#[derive(Debug, Clone)]
struct ServeLogTailState {
octos_home: PathBuf,
path: PathBuf,
offset: u64,
pending: String,
filter: ServeLogFilter,
}

impl ServeLogTailState {
fn refresh_path_for_rotation(&mut self) {
let next_path = serve_log_path_for_now(&self.octos_home);
if next_path != self.path {
self.path = next_path;
self.offset = 0;
self.pending.clear();
}
}
}

fn serve_log_path_for_now(octos_home: &FsPath) -> PathBuf {
serve_log_path_for_date(octos_home, Local::now().date_naive())
}

fn serve_log_path_for_date(octos_home: &FsPath, date: chrono::NaiveDate) -> PathBuf {
octos_home
.join("logs")
.join(format!("serve.{}.log", date.format("%Y-%m-%d")))
}

async fn read_serve_log_replay(
path: &FsPath,
tail_n: usize,
filter: &ServeLogFilter,
) -> std::io::Result<Vec<String>> {
match tokio::fs::read_to_string(path).await {
Ok(content) => Ok(filter_serve_log_lines(content.lines(), tail_n, filter)),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
Err(error) => Err(error),
}
}

async fn read_new_serve_log_lines(state: &mut ServeLogTailState) -> std::io::Result<Vec<String>> {
let mut file = match tokio::fs::File::open(&state.path).await {
Ok(file) => file,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(error) => return Err(error),
};

let len = file.metadata().await?.len();
if len < state.offset {
state.offset = 0;
state.pending.clear();
}
file.seek(std::io::SeekFrom::Start(state.offset)).await?;

let mut chunk = String::new();
file.read_to_string(&mut chunk).await?;
state.offset += chunk.len() as u64;
if chunk.is_empty() {
return Ok(Vec::new());
}

let mut text = std::mem::take(&mut state.pending);
text.push_str(&chunk);
if !text.ends_with('\n') {
if let Some((complete, pending)) = text.rsplit_once('\n') {
state.pending = pending.to_string();
text = complete.to_string();
} else {
state.pending = text;
return Ok(Vec::new());
}
}

Ok(text
.lines()
.filter(|line| serve_log_line_matches(line, &state.filter))
.map(redact_serve_log_line)
.collect())
}

fn filter_serve_log_lines<'a>(
lines: impl Iterator<Item = &'a str>,
tail_n: usize,
filter: &ServeLogFilter,
) -> Vec<String> {
let mut matches = lines
.filter(|line| serve_log_line_matches(line, filter))
.map(redact_serve_log_line)
.collect::<Vec<_>>();
if matches.len() > tail_n {
matches.drain(0..matches.len() - tail_n);
}
matches
}

fn serve_log_line_matches(line: &str, filter: &ServeLogFilter) -> bool {
if let Some(since) = filter.since {
if let Some(timestamp) = serve_log_line_timestamp(line) {
if timestamp < since {
return false;
}
}
}
filter.grep.as_ref().is_none_or(|grep| grep.is_match(line))
}

fn serve_log_line_timestamp(line: &str) -> Option<DateTime<Utc>> {
let token = line.split_whitespace().next()?;
DateTime::parse_from_rfc3339(token)
.ok()
.map(|timestamp| timestamp.with_timezone(&Utc))
}

fn redact_serve_log_line(line: &str) -> String {
let after_bearer = SERVE_LOG_BEARER_RE.replace_all(line, "Bearer [credential-redacted]");
let after_query_token =
SERVE_LOG_QUERY_TOKEN_RE.replace_all(&after_bearer, "${1}[credential-redacted]");
SERVE_LOG_API_KEY_RE
.replace_all(&after_query_token, "[credential-redacted]")
.into_owned()
}

/// GET /api/admin/profiles/:id/whatsapp/qr
pub async fn whatsapp_qr(
State(state): State<Arc<AppState>>,
Expand Down Expand Up @@ -4643,6 +4908,7 @@ mod register_flow_tests {
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;

#[test]
fn shell_request_deserialize_minimal() {
Expand Down Expand Up @@ -4683,6 +4949,89 @@ mod tests {
assert_eq!(MAX_SHELL_TIMEOUT, 600);
}

#[test]
fn serve_log_replay_filters_by_since_grep_and_tail() {
let query = ServeLogsQuery {
tail_n: Some(2),
grep: Some(".*error.*".into()),
since: Some(
DateTime::parse_from_rfc3339("2026-05-24T10:00:00Z")
.unwrap()
.with_timezone(&Utc),
),
};
let filter = ServeLogFilter::from_query(&query).unwrap();
let content = [
"2026-05-24T09:59:59Z ERROR too old",
"2026-05-24T10:00:00Z INFO ignored",
"2026-05-24T10:00:01Z ERROR first",
"2026-05-24T10:00:02Z error second",
"2026-05-24T10:00:03Z ERROR token=secret-token",
];

let lines = filter_serve_log_lines(content.into_iter(), query.tail_n.unwrap(), &filter);

assert_eq!(
lines,
vec![
"2026-05-24T10:00:02Z error second".to_string(),
"2026-05-24T10:00:03Z ERROR token=[credential-redacted]".to_string(),
]
);
}

#[test]
fn serve_log_redaction_masks_credentials() {
let redacted = redact_serve_log_line(
"Authorization: Bearer abcdef0123456789ABCDEF0123 url=/x?token=secret123 key=sk-testsecret12345",
);

assert!(redacted.contains("Bearer [credential-redacted]"));
assert!(redacted.contains("?token=[credential-redacted]"));
assert!(!redacted.contains("abcdef0123456789ABCDEF0123"));
assert!(!redacted.contains("secret123"));
assert!(!redacted.contains("sk-testsecret12345"));
}

#[tokio::test]
async fn serve_log_tail_reads_only_appended_complete_lines() {
let dir = tempfile::tempdir().unwrap();
let path = serve_log_path_for_date(
dir.path(),
chrono::NaiveDate::from_ymd_opt(2026, 5, 24).unwrap(),
);
tokio::fs::create_dir_all(path.parent().unwrap())
.await
.unwrap();
tokio::fs::write(&path, "2026-05-24T10:00:00Z INFO boot\n")
.await
.unwrap();
let mut state = ServeLogTailState {
octos_home: dir.path().to_path_buf(),
path,
offset: "2026-05-24T10:00:00Z INFO boot\n".len() as u64,
pending: String::new(),
filter: ServeLogFilter {
grep: None,
since: None,
},
};

tokio::fs::OpenOptions::new()
.append(true)
.open(&state.path)
.await
.unwrap()
.write_all(b"2026-05-24T10:00:01Z INFO appended\npartial")
.await
.unwrap();

let lines = read_new_serve_log_lines(&mut state).await.unwrap();

assert_eq!(lines, vec!["2026-05-24T10:00:01Z INFO appended"]);
assert_eq!(state.pending, "partial");
}

#[tokio::test]
async fn shell_echo_command() {
let req = ShellRequest {
Expand Down
1 change: 1 addition & 0 deletions crates/octos-cli/src/api/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ pub fn build_router(state: Arc<AppState>) -> Router {
)
// System metrics
.route("/api/admin/system/metrics", get(admin::system_metrics))
.route("/api/admin/serve/logs", get(admin::serve_logs))
.route("/api/admin/operator/summary", get(admin::operator_summary))
.route("/api/admin/operator/tasks", get(admin::operator_tasks))
// Monitor control
Expand Down
Loading
Loading