Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 167 additions & 17 deletions crates/harness-server/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,15 @@ impl AppServerRuntime for CodexHarnessServer {
}

pub(crate) fn run_codex_blocks_server() -> Result<()> {
let mut codex = CodexJsonRpcChild::spawn()?;
let mut stdout = io::stdout().lock();
let mut request_id = 1_i64;
let mut thread_id: Option<String> = None;
let mut blocks_state = BlocksState::default();

let initialize_id = next_request_id(&mut request_id);
codex.send_request(
initialize_id,
"initialize",
json!({
"clientInfo": {
"name": "centaur-harness-server",
"title": null,
"version": env!("CARGO_PKG_VERSION"),
},
"capabilities": null,
}),
)?;
codex.read_response_or_forward(initialize_id, &mut stdout)?;
// The codex app-server child is spawned lazily on the first turn so the
// Slack thread env (below) is exported *before* the child exists — the
// child captures the environment at spawn and passes it to the tool
// subprocesses it runs (e.g. slack-upload).
let mut codex: Option<CodexJsonRpcChild> = None;

let stdin = io::stdin();
for raw in stdin.lock().lines() {
Expand All @@ -95,14 +84,28 @@ pub(crate) fn run_codex_blocks_server() -> Result<()> {
continue;
}

// Until the codex child is spawned (lazily, on the first turn), export
// the Slack thread env from each line's thread_key so the child — and
// the tools it runs (e.g. slack-upload) — inherit it. The thread_key is
// constant for the process, so re-exporting before spawn is idempotent.
if codex.is_none()
&& let Some(thread_key) = thread_key_from_blocks_line(trimmed)
{
export_slack_thread_env(&thread_key);
}

match parse_blocks_line_with_state(trimmed, &mut blocks_state) {
Ok(BlocksCommand::User {
input,
client_user_message_id,
model,
}) => {
let codex = match codex {
Some(ref mut codex) => codex,
None => codex.insert(spawn_codex_app_server(&mut stdout, &mut request_id)?),
};
if let Err(error) = run_codex_user_turn(
&mut codex,
codex,
&mut stdout,
&mut request_id,
&mut thread_id,
Expand Down Expand Up @@ -136,6 +139,89 @@ pub(crate) fn run_codex_blocks_server() -> Result<()> {
Ok(())
}

/// Spawn the codex app-server child and complete the `initialize` handshake.
///
/// Deferred until the first user turn so that any Slack thread environment
/// (`SLACK_CHANNEL`/`SLACK_THREAD_TS`) is exported before the child — and the
/// tool subprocesses it spawns — capture the environment.
fn spawn_codex_app_server<W: Write>(
stdout: &mut W,
request_id: &mut i64,
) -> Result<CodexJsonRpcChild> {
let mut codex = CodexJsonRpcChild::spawn()?;
let initialize_id = next_request_id(request_id);
codex.send_request(
initialize_id,
"initialize",
json!({
"clientInfo": {
"name": "centaur-harness-server",
"title": null,
"version": env!("CARGO_PKG_VERSION"),
},
"capabilities": null,
}),
)?;
codex.read_response_or_forward(initialize_id, stdout)?;
Ok(codex)
}

/// Derive `SLACK_CHANNEL`/`SLACK_THREAD_TS` from a Slack thread key and export
/// them so sandbox tools such as `slack-upload` can target the originating
/// thread. No-op for non-Slack threads.
///
/// Must be called before the codex app-server child is spawned (and thus while
/// the process is still single-threaded), so the variables are inherited by the
/// agent's tool subprocesses.
fn export_slack_thread_env(thread_key: &str) {
let Some((channel, thread_ts)) = slack_channel_and_thread(thread_key) else {
return;
};
// SAFETY: called from `run_codex_blocks_server` before the codex child or
// its stdout reader thread is spawned, so no other thread is concurrently
// reading or writing the process environment.
unsafe {
env::set_var("SLACK_CHANNEL", channel);
env::set_var("SLACK_THREAD_TS", thread_ts);
}
}

/// Parse the `(channel, thread_ts)` pair out of a Slack thread key, supporting
/// both the api-rs shape `slack:<channel>:<thread_ts>` and the legacy api shape
/// `slack:<team>:<channel>:<thread_ts>`. Returns `None` for non-Slack keys,
/// unexpected arities, or empty components.
fn slack_channel_and_thread(thread_key: &str) -> Option<(String, String)> {
let parts: Vec<&str> = thread_key.split(':').collect();
if parts.first() != Some(&"slack") || !matches!(parts.len(), 3 | 4) {
return None;
}
let channel = parts[parts.len() - 2];
let thread_ts = parts[parts.len() - 1];
if channel.is_empty() || thread_ts.is_empty() {
return None;
}
Some((channel.to_string(), thread_ts.to_string()))
}

/// Pull the `thread_key` field out of a raw blocks input line without consuming
/// the richer `parse_blocks_line_with_state` parse. Returns `None` when the
/// field is absent, blank, or the line is not valid JSON.
fn thread_key_from_blocks_line(line: &str) -> Option<String> {
#[derive(serde::Deserialize)]
struct ThreadKeyProbe {
#[serde(default)]
thread_key: Option<String>,
}
let probe: ThreadKeyProbe = serde_json::from_str(line).ok()?;
let key = probe.thread_key?;
let trimmed = key.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}

fn run_codex_user_turn<W: Write>(
codex: &mut CodexJsonRpcChild,
stdout: &mut W,
Expand Down Expand Up @@ -443,3 +529,67 @@ fn codex_supports_stdio_listen(bin: &str) -> bool {
let stderr = String::from_utf8_lossy(&output.stderr);
stdout.contains("--listen") || stderr.contains("--listen")
}

#[cfg(test)]
mod tests {
use super::{slack_channel_and_thread, thread_key_from_blocks_line};

#[test]
fn slack_channel_and_thread_parses_api_rs_three_part_key() {
assert_eq!(
slack_channel_and_thread("slack:C123:123.456"),
Some(("C123".to_string(), "123.456".to_string()))
);
}

#[test]
fn slack_channel_and_thread_parses_legacy_four_part_key() {
assert_eq!(
slack_channel_and_thread("slack:T999:C123:123.456"),
Some(("C123".to_string(), "123.456".to_string()))
);
}

#[test]
fn slack_channel_and_thread_ignores_non_slack_keys() {
assert_eq!(slack_channel_and_thread("web:t1"), None);
}

#[test]
fn slack_channel_and_thread_ignores_unexpected_arity() {
assert_eq!(slack_channel_and_thread("slack:C123"), None);
assert_eq!(slack_channel_and_thread("slack:T:C:1.2:extra"), None);
}

#[test]
fn slack_channel_and_thread_rejects_empty_components() {
assert_eq!(slack_channel_and_thread("slack::123.456"), None);
assert_eq!(slack_channel_and_thread("slack:C123:"), None);
}

#[test]
fn thread_key_from_blocks_line_extracts_key_from_user_turn() {
let line = r#"{"type":"user","thread_key":"slack:C123:123.456","message":{"role":"user","content":[{"type":"text","text":"hi"}]}}"#;
assert_eq!(
thread_key_from_blocks_line(line).as_deref(),
Some("slack:C123:123.456")
);
}

#[test]
fn thread_key_from_blocks_line_is_none_when_absent_or_blank() {
assert_eq!(
thread_key_from_blocks_line(r#"{"type":"user","text":"hi"}"#),
None
);
assert_eq!(
thread_key_from_blocks_line(r#"{"type":"user","thread_key":" "}"#),
None
);
}

#[test]
fn thread_key_from_blocks_line_is_none_for_invalid_json() {
assert_eq!(thread_key_from_blocks_line("not json"), None);
}
}
58 changes: 51 additions & 7 deletions crates/harness-server/tests/app_server_stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,7 @@ fn fake_amp_blocks_mode_accepts_user_blocks_by_default() {
fn fake_codex_blocks_mode_spawns_app_server_and_translates_user_blocks() {
let fake_codex = temp_path("fake-codex.sh");
let fake_codex_log = temp_path("fake-codex-requests.jsonl");
let script = fake_codex_app_server_script(&fake_codex_log);
std::fs::write(&fake_codex, script).expect("write fake codex script");
let mut permissions = std::fs::metadata(&fake_codex)
.expect("fake codex metadata")
.permissions();
permissions.set_mode(0o755);
std::fs::set_permissions(&fake_codex, permissions).expect("chmod fake codex script");
write_executable_script(&fake_codex, &fake_codex_app_server_script(&fake_codex_log));

let mut bridge = BridgeProcess::spawn_harness_blocks(
Harness::Codex,
Expand Down Expand Up @@ -246,6 +240,43 @@ fn fake_codex_blocks_mode_spawns_app_server_and_translates_user_blocks() {
let _ = std::fs::remove_file(fake_codex_log);
}

#[test]
fn codex_blocks_mode_exports_slack_thread_env_before_spawning_app_server() {
let fake_codex = temp_path("fake-codex-slack-env.sh");
let fake_codex_log = temp_path("fake-codex-slack-env-requests.jsonl");
let env_log = PathBuf::from(format!("{}.env", fake_codex_log.display()));
write_executable_script(&fake_codex, &fake_codex_app_server_script(&fake_codex_log));

let mut bridge = BridgeProcess::spawn_harness_blocks(
Harness::Codex,
None,
Some((
"CODEX_BIN",
fake_codex.to_str().expect("utf-8 fake codex path"),
)),
);
// run_blocks_user_turn sends thread_key "slack:C123:123.456".
let turn = bridge.run_blocks_user_turn("say codex blocks", Duration::from_secs(10));
bridge.finish_successfully();
assert_completed_turn(&turn);

let recorded = std::fs::read_to_string(&env_log).unwrap_or_else(|error| {
panic!("fake codex did not record its env at {env_log:?}: {error}")
});
assert!(
recorded.contains("SLACK_CHANNEL=C123"),
"codex app-server should inherit SLACK_CHANNEL from the Slack thread_key; recorded={recorded:?}"
);
assert!(
recorded.contains("SLACK_THREAD_TS=123.456"),
"codex app-server should inherit SLACK_THREAD_TS from the Slack thread_key; recorded={recorded:?}"
);

let _ = std::fs::remove_file(fake_codex);
let _ = std::fs::remove_file(fake_codex_log);
let _ = std::fs::remove_file(env_log);
}

#[test]
fn fake_amp_final_only_assistant_message_is_chunked_into_codex_deltas() {
let expected = expected_long_text();
Expand Down Expand Up @@ -1345,6 +1376,10 @@ if [ "${1:-}" != "app-server" ]; then
exit 64
fi

# Record the Slack thread env the harness exported before spawning us, so tests
# can assert SLACK_CHANNEL/SLACK_THREAD_TS reach codex (and its tool subprocesses).
printf 'SLACK_CHANNEL=%s\nSLACK_THREAD_TS=%s\n' "${SLACK_CHANNEL:-}" "${SLACK_THREAD_TS:-}" > "$log.env"

request_id() {
printf '%s' "$1" | sed -n 's/.*"id":\([0-9][0-9]*\).*/\1/p'
}
Expand Down Expand Up @@ -1391,6 +1426,15 @@ fn temp_path(name: &str) -> PathBuf {
))
}

fn write_executable_script(path: &Path, contents: &str) {
std::fs::write(path, contents).expect("write script");
let mut permissions = std::fs::metadata(path)
.expect("script metadata")
.permissions();
permissions.set_mode(0o755);
std::fs::set_permissions(path, permissions).expect("chmod script");
}

fn shell_quote(path: &Path) -> String {
let raw = path.to_string_lossy();
shell_quote_str(&raw)
Expand Down
Loading