Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/worktrees/agent-a590fa80
Submodule agent-a590fa80 added at b2a98d
1 change: 1 addition & 0 deletions .claude/worktrees/agent-a64cece9
Submodule agent-a64cece9 added at b2a98d
1 change: 1 addition & 0 deletions .claude/worktrees/agent-aa10ac9d
Submodule agent-aa10ac9d added at b2a98d
1 change: 1 addition & 0 deletions .claude/worktrees/agent-ab509a22
Submodule agent-ab509a22 added at b2a98d
1 change: 1 addition & 0 deletions .claude/worktrees/agent-ab9caea4
Submodule agent-ab9caea4 added at b2a98d
1 change: 1 addition & 0 deletions .claude/worktrees/agent-ac27c558
Submodule agent-ac27c558 added at 09188f
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions crates/icm-mcp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const PROTOCOL_VERSION: &str = "2024-11-05";
/// Number of non-store tool calls before we nudge the agent to store.
const STORE_NUDGE_THRESHOLD: u32 = 10;

/// Maximum allowed line length (10 MB). Lines exceeding this are rejected
/// without parsing to prevent memory exhaustion.
const MAX_LINE_LEN: usize = 10 * 1024 * 1024;

/// Run the MCP server on stdio. Blocks until stdin is closed.
pub fn run_server(
store: &SqliteStore,
Expand All @@ -40,6 +44,17 @@ pub fn run_server(
continue;
}

if line.len() > MAX_LINE_LEN {
error!("line too long: {} bytes (max {MAX_LINE_LEN})", line.len());
let resp = JsonRpcResponse::err(
Value::Null,
-32600,
format!("line too long: {} bytes (max {MAX_LINE_LEN})", line.len()),
);
write_response(&mut stdout, &resp)?;
continue;
}

let msg: JsonRpcMessage = match serde_json::from_str(line) {
Ok(m) => m,
Err(e) => {
Expand Down
21 changes: 21 additions & 0 deletions crates/icm-mcp/src/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,16 @@ fn tool_memoir_create(store: &SqliteStore, args: &Value) -> ToolResult {
Some(n) => n,
None => return ToolResult::error("missing required field: name".into()),
};
if name.len() > 255 {
return ToolResult::error(format!("name too long: {} chars (max 255)", name.len()));
}
let description = get_str(args, "description").unwrap_or("");
if description.len() > 10_000 {
return ToolResult::error(format!(
"description too long: {} chars (max 10000)",
description.len()
));
}

let memoir = Memoir::new(name.into(), description.into());
match store.create_memoir(memoir) {
Expand Down Expand Up @@ -1268,10 +1277,22 @@ fn tool_memoir_add_concept(store: &SqliteStore, args: &Value) -> ToolResult {
Some(n) => n,
None => return ToolResult::error("missing required field: name".into()),
};
if name.len() > 255 {
return ToolResult::error(format!(
"concept name too long: {} chars (max 255)",
name.len()
));
}
let definition = match get_str(args, "definition") {
Some(d) => d,
None => return ToolResult::error("missing required field: definition".into()),
};
if definition.len() > 10_000 {
return ToolResult::error(format!(
"definition too long: {} chars (max 10000)",
definition.len()
));
}

let memoir = match resolve_memoir(store, memoir_name) {
Ok(m) => m,
Expand Down
134 changes: 100 additions & 34 deletions crates/icm-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ impl SqliteStore {
}
let conn = Connection::open(path)
.map_err(|e| IcmError::Database(format!("cannot open database: {e}")))?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")
.map_err(db_err)?;
conn.execute_batch(
"PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON; PRAGMA busy_timeout=5000;",
)
.map_err(db_err)?;
init_db_with_dims(&conn, embedding_dims)?;
Ok(Self { conn })
}
Expand All @@ -65,36 +67,22 @@ impl SqliteStore {
/// Called automatically on recall to avoid manual `icm decay` cron.
pub fn maybe_auto_decay(&self) -> IcmResult<()> {
let now = Utc::now();
let now_str = now.to_rfc3339();

let last: Option<String> = self
// Atomic check-and-update: only one caller wins the race.
// First, try to claim the decay slot by inserting or conditionally updating.
let changed = self
.conn
.query_row(
"SELECT value FROM icm_metadata WHERE key = 'last_decay_at'",
[],
|row| row.get(0),
.execute(
"INSERT INTO icm_metadata (key, value) VALUES ('last_decay_at', ?1)
ON CONFLICT(key) DO UPDATE SET value = ?1
WHERE value IS NULL OR julianday(?1) - julianday(value) >= 1.0",
params![now_str],
)
.optional()
.map_err(db_err)?;

let should_decay = match last {
Some(ts) => {
let last_dt = DateTime::parse_from_rfc3339(&ts)
.map(|d| d.with_timezone(&Utc))
.unwrap_or_else(|_| now - chrono::Duration::hours(25));
(now - last_dt).num_hours() >= 24
}
None => true,
};

if should_decay {
if changed > 0 {
self.apply_decay(0.95)?;
self.conn
.execute(
"INSERT INTO icm_metadata (key, value) VALUES ('last_decay_at', ?1)
ON CONFLICT(key) DO UPDATE SET value = ?1",
params![now.to_rfc3339()],
)
.map_err(db_err)?;
}

Ok(())
Expand All @@ -104,7 +92,7 @@ impl SqliteStore {
ensure_sqlite_vec();
let conn = Connection::open_in_memory()
.map_err(|e| IcmError::Database(format!("cannot open in-memory db: {e}")))?;
conn.execute_batch("PRAGMA foreign_keys=ON;")
conn.execute_batch("PRAGMA foreign_keys=ON; PRAGMA busy_timeout=5000;")
.map_err(db_err)?;
init_db(&conn)?;
Ok(Self { conn })
Expand Down Expand Up @@ -218,9 +206,13 @@ const SELECT_COLS: &str = "id, created_at, updated_at, last_accessed, access_cou
///
/// This function strips special chars and wraps each token in double quotes.
fn sanitize_fts_query(query: &str) -> String {
// Limit input length to prevent abuse
// Limit input length to prevent abuse (UTF-8 safe truncation)
let query = if query.len() > 10_000 {
&query[..10_000]
let mut end = 10_000;
while end > 0 && !query.is_char_boundary(end) {
end -= 1;
}
&query[..end]
} else {
query
};
Expand Down Expand Up @@ -258,8 +250,10 @@ fn sanitize_fts_query(query: &str) -> String {
// MemoryStore impl
// ---------------------------------------------------------------------------

impl MemoryStore for SqliteStore {
fn store(&self, memory: Memory) -> IcmResult<String> {
impl SqliteStore {
/// Insert a memory into the database without transaction management.
/// Callers are responsible for wrapping this in a transaction.
fn store_inner(&self, memory: &Memory) -> IcmResult<String> {
let keywords_json = serde_json::to_string(&memory.keywords)?;
let related_json = serde_json::to_string(&memory.related_ids)?;
let st = source_type(&memory.source);
Expand Down Expand Up @@ -302,7 +296,26 @@ impl MemoryStore for SqliteStore {
.map_err(db_err)?;
}

Ok(memory.id)
Ok(memory.id.clone())
}
}

impl MemoryStore for SqliteStore {
fn store(&self, memory: Memory) -> IcmResult<String> {
self.conn
.execute_batch("BEGIN IMMEDIATE;")
.map_err(db_err)?;

match self.store_inner(&memory) {
Ok(id) => {
self.conn.execute_batch("COMMIT;").map_err(db_err)?;
Ok(id)
}
Err(e) => {
let _ = self.conn.execute_batch("ROLLBACK;");
Err(e)
}
}
}

fn get(&self, id: &str) -> IcmResult<Option<Memory>> {
Expand Down Expand Up @@ -720,7 +733,7 @@ impl MemoryStore for SqliteStore {

fn consolidate_topic(&self, topic: &str, consolidated: Memory) -> IcmResult<()> {
self.conn
.execute_batch("BEGIN TRANSACTION;")
.execute_batch("BEGIN IMMEDIATE;")
.map_err(db_err)?;

// Clean vec_memories for entries about to be deleted
Expand All @@ -744,7 +757,7 @@ impl MemoryStore for SqliteStore {
return Err(IcmError::Database(e.to_string()));
}

if let Err(e) = self.store(consolidated) {
if let Err(e) = self.store_inner(&consolidated) {
tracing::warn!(topic, error = %e, "consolidate_topic: rolling back after store failed");
let _ = self.conn.execute_batch("ROLLBACK;");
return Err(e);
Expand Down Expand Up @@ -3779,4 +3792,57 @@ mod tests {
let results = store.get_by_topic_prefix("project:*").unwrap();
assert_eq!(results.len(), 2);
}

#[test]
fn test_store_is_atomic() {
let store = test_store();
let mut mem = make_memory("atomic", "test atomicity");
mem.embedding = Some(vec![0.1; 384]);
let id = mem.id.clone();

store.store(mem).unwrap();

// Verify main table has the row
let retrieved = store.get(&id).unwrap().unwrap();
assert_eq!(retrieved.summary, "test atomicity");

// Verify vec_memories also has the row
let vec_count: i64 = store
.conn
.query_row(
"SELECT count(*) FROM vec_memories WHERE memory_id = ?1",
params![id],
|row| row.get(0),
)
.unwrap();
assert_eq!(vec_count, 1);
}

#[test]
fn test_busy_timeout_pragma() {
let store = test_store();
let timeout: i64 = store
.conn
.query_row("PRAGMA busy_timeout", [], |row| row.get(0))
.unwrap();
assert_eq!(timeout, 5000);
}

#[test]
fn test_fts_sanitize_utf8_safe() {
// Build a string with multibyte chars near the 10k boundary.
// Each emoji is 4 bytes. Fill up to just past 10_000 bytes.
let base = "a".repeat(9_998);
// Add a 4-byte emoji that straddles the 10_000 boundary
let input = format!("{base}\u{1F600}\u{1F600}"); // 9998 + 4 + 4 = 10006 bytes
assert!(input.len() > 10_000);

// This should not panic (the old code could split a UTF-8 char)
let result = sanitize_fts_query(&input);
// The result should be valid UTF-8 (it's a String, so it is by construction)
assert!(!result.is_empty());
// The truncated input should not contain partial emoji
// (9998 + 4 = 10002 > 10000, so the emoji at 9998 is excluded; end = 9998)
// Result should just be the 'a' tokens
}
}
Loading
Loading