diff --git a/Cargo.lock b/Cargo.lock index 7e17637..c18881f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,6 +59,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "agent-store" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba8b3df4afcd4972c97a317640cfa09fcb05d17f85a70e8c66daf006d08ad6d" +dependencies = [ + "anyhow", + "blake3", + "rusqlite", + "serde", + "thiserror 2.0.18", +] + [[package]] name = "ahash" version = "0.8.12" @@ -1499,6 +1512,7 @@ version = "0.1.0" dependencies = [ "agent-bridle-core", "agent-bridle-tool-web", + "agent-store", "anyhow", "async-trait", "blake3", diff --git a/Cargo.toml b/Cargo.toml index 9df69f7..5a0620c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,9 @@ repository = "https://github.com/hartsock/modulex-mcp" [workspace.dependencies] modulex-core = { path = "crates/modulex-core", version = "=0.1.0" } +# agent-store: causal-store substrate (Generation counter, SqliteBackend). +agent-store = "0.1" + agent-bridle-core = "0.1.0" agent-bridle-tool-web = "0.1.0" anyhow = "1.0" diff --git a/crates/modulex-core/Cargo.toml b/crates/modulex-core/Cargo.toml index 480ef57..5af47a7 100644 --- a/crates/modulex-core/Cargo.toml +++ b/crates/modulex-core/Cargo.toml @@ -20,6 +20,7 @@ test-support = [] [dependencies] agent-bridle-core = { workspace = true } agent-bridle-tool-web = { workspace = true, optional = true } +agent-store = { workspace = true } anyhow = { workspace = true } blake3 = { workspace = true, optional = true } rusqlite = { workspace = true } diff --git a/crates/modulex-core/src/config.rs b/crates/modulex-core/src/config.rs index a6241e9..704cf62 100644 --- a/crates/modulex-core/src/config.rs +++ b/crates/modulex-core/src/config.rs @@ -184,12 +184,17 @@ pub struct McpConfig { pub deny: Vec, } -/// Agent state store location (`[store]`). +/// Agent state store config (`[store]`). #[derive(Clone, Debug, Default, Deserialize)] pub struct StoreConfig { /// SQLite path; empty = `$MODULEX_STORE` → `~/.modulex/store.db`. #[serde(default)] pub path: String, + /// Backend-selection policy — a flat knob, `[store] backend = "sqlite"`. + /// Defaults to the safe, daemonless SQLite backend; change direction + /// (Postgres, later) by editing config, not code. + #[serde(flatten)] + pub policy: agent_store::StorePolicy, } /// A fixed date to count down to. @@ -367,6 +372,26 @@ pub fn expand_tilde(path: &str) -> PathBuf { mod tests { use super::*; + #[test] + fn store_backend_is_a_flat_policy_knob() { + // The agent-store StorePolicy flattens into [store] as a flat knob, + // alongside `path`, parsed via the real Config path. + let cfg = Config::from_toml( + r#" +[store] +path = "/tmp/x.db" +backend = "postgres" +"#, + ) + .unwrap(); + assert_eq!(cfg.store.path, "/tmp/x.db"); + assert_eq!(cfg.store.policy.backend, agent_store::BackendKind::Postgres); + + // Default: no backend key => the safe SQLite default. + let cfg = Config::from_toml("[store]\npath = \"/tmp/y.db\"\n").unwrap(); + assert_eq!(cfg.store.policy.backend, agent_store::BackendKind::Sqlite); + } + #[test] fn minimal_routine_parses_with_flattened_params() { let cfg = Config::from_toml( diff --git a/crates/modulex-core/src/engine.rs b/crates/modulex-core/src/engine.rs index eff281c..63fd38a 100644 --- a/crates/modulex-core/src/engine.rs +++ b/crates/modulex-core/src/engine.rs @@ -103,7 +103,7 @@ impl Engine { (!config.store.path.is_empty()).then_some(config.store.path.as_str()), home.as_deref(), ); - let store = match crate::store::Store::open(&path) { + let store = match crate::store::Store::open_with_policy(&config.store.policy, &path) { Ok(store) => Some(Arc::new(store)), Err(e) => { eprintln!( diff --git a/crates/modulex-core/src/store.rs b/crates/modulex-core/src/store.rs index e3f714a..41d5cc3 100644 --- a/crates/modulex-core/src/store.rs +++ b/crates/modulex-core/src/store.rs @@ -22,6 +22,7 @@ use std::path::{Path, PathBuf}; use std::sync::Mutex; +use agent_store::{Generation, SqliteBackend}; use rusqlite::{params, Connection, OptionalExtension}; use serde::{Deserialize, Serialize}; @@ -136,6 +137,40 @@ pub enum StoreError { /// Import payload malformed. #[error("store import: {0}")] Import(String), + /// A failure surfaced by the agent-store substrate. + #[error("store substrate: {0}")] + Substrate(String), +} + +fn substrate_err(e: agent_store::StoreError) -> StoreError { + StoreError::Substrate(e.to_string()) +} + +/// One-time migration of the engine generation counter. Pre-substrate builds +/// stored it as a string in modulex's own `meta` table; it now lives in the +/// agent-store substrate. Idempotent — only runs while the substrate counter +/// is still 0, then drops the legacy row so there is one source of truth. +fn migrate_legacy_generation( + conn: &Connection, + db: &dyn agent_store::Backend, +) -> Result<(), StoreError> { + let counter = Generation::new("last_generation"); + if counter.current(db).map_err(substrate_err)? != 0 { + return Ok(()); + } + let legacy: Option = conn + .query_row( + "SELECT value FROM meta WHERE key = 'last_generation'", + [], + |r| r.get::<_, String>(0), + ) + .optional()? + .and_then(|v| v.parse().ok()); + if let Some(generation) = legacy { + counter.set(db, generation).map_err(substrate_err)?; + conn.execute("DELETE FROM meta WHERE key = 'last_generation'", [])?; + } + Ok(()) } /// A registered reminder. @@ -355,7 +390,7 @@ pub struct StoreDump { /// through one connection (SQLite is the bottleneck anyway, and routine /// state traffic is tiny). pub struct Store { - conn: Mutex, + backend: Mutex, } impl Store { @@ -372,19 +407,43 @@ impl Store { } let conn = Connection::open(path)?; let store = Self { - conn: Mutex::new(conn), + backend: Mutex::new(SqliteBackend::from_connection(conn)), }; store.migrate()?; Ok(store) } + /// Open the store the [`StorePolicy`](agent_store::StorePolicy) selects. + /// + /// modulex's domain tables are SQLite-specific today, so the `postgres` + /// backend is reserved (Phase 2) and rejected with a clear error; the + /// caller treats that as a soft "store unavailable" and skips store-backed + /// steps. The default policy selects SQLite — i.e. [`Store::open`]. + /// + /// # Errors + /// [`StoreError`] when SQLite fails, or when the policy selects a backend + /// this build does not support yet. + pub fn open_with_policy( + policy: &agent_store::StorePolicy, + path: &Path, + ) -> Result { + match policy.backend { + agent_store::BackendKind::Sqlite => Self::open(path), + agent_store::BackendKind::Postgres => Err(StoreError::Substrate( + "store backend = \"postgres\" is reserved (Phase 2); modulex \ + supports the sqlite backend today" + .into(), + )), + } + } + /// An in-memory store (tests, ephemeral runs). /// /// # Errors /// [`StoreError`] when SQLite fails. pub fn in_memory() -> Result { let store = Self { - conn: Mutex::new(Connection::open_in_memory()?), + backend: Mutex::new(SqliteBackend::from_connection(Connection::open_in_memory()?)), }; store.migrate()?; Ok(store) @@ -407,7 +466,8 @@ impl Store { } fn migrate(&self) -> Result<(), StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let version: i32 = conn.query_row("PRAGMA user_version", [], |r| r.get(0))?; // Version-guarded, idempotent steps: a fresh DB (v0) runs both; a v1 DB // runs only v2. `CREATE TABLE IF NOT EXISTS` keeps re-runs harmless. @@ -417,6 +477,9 @@ impl Store { if version < 2 { conn.execute_batch(MIGRATION_V2)?; } + // The engine generation counter now lives in the agent-store substrate. + Generation::ensure_schema(&*backend).map_err(substrate_err)?; + migrate_legacy_generation(conn, &*backend)?; Ok(()) } @@ -426,17 +489,10 @@ impl Store { /// stay monotonic across restarts. #[must_use] pub fn last_generation(&self) -> u64 { - let conn = self.conn.lock().expect("store lock poisoned"); - conn.query_row( - "SELECT value FROM meta WHERE key = 'last_generation'", - [], - |r| r.get::<_, String>(0), - ) - .optional() - .ok() - .flatten() - .and_then(|v| v.parse().ok()) - .unwrap_or(0) + let backend = self.backend.lock().expect("store lock poisoned"); + Generation::new("last_generation") + .current(&*backend) + .unwrap_or(0) } /// Persist the engine generation after a run. @@ -444,13 +500,10 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn set_last_generation(&self, generation: u64) -> Result<(), StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); - conn.execute( - "INSERT INTO meta (key, value) VALUES ('last_generation', ?1) - ON CONFLICT(key) DO UPDATE SET value = excluded.value", - params![generation.to_string()], - )?; - Ok(()) + let backend = self.backend.lock().expect("store lock poisoned"); + Generation::new("last_generation") + .set(&*backend, generation) + .map_err(substrate_err) } // ── reminders ────────────────────────────────────────────────────── @@ -466,7 +519,8 @@ impl Store { recurrence: Option<&str>, generation: u64, ) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); conn.execute( "INSERT INTO reminders (text, due, recurrence, created_gen) VALUES (?1, ?2, ?3, ?4)", params![text, due, recurrence, generation], @@ -479,7 +533,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn reminders_open(&self) -> Result, StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare( "SELECT id, text, due, recurrence, created_gen, done_gen FROM reminders WHERE done_gen IS NULL ORDER BY id", @@ -496,7 +551,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn reminder_done(&self, id: i64, generation: u64) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let changed = conn.execute( "UPDATE reminders SET done_gen = ?2 WHERE id = ?1 AND done_gen IS NULL", params![id, generation], @@ -520,7 +576,8 @@ impl Store { generation: u64, ) -> Result { let display = display.unwrap_or("{label}: work day {n} of {total}"); - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); conn.execute( "INSERT INTO countdowns (label, start_date, end_date, total_work_days, display, created_gen) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", @@ -534,7 +591,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn countdowns_active(&self) -> Result, StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare( "SELECT id, label, start_date, end_date, total_work_days, display, created_gen, retired_gen FROM countdowns WHERE retired_gen IS NULL ORDER BY id", @@ -550,7 +608,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn countdown_retire(&self, id: i64, generation: u64) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let changed = conn.execute( "UPDATE countdowns SET retired_gen = ?2 WHERE id = ?1 AND retired_gen IS NULL", params![id, generation], @@ -565,7 +624,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn watch_add(&self, url: &str, note: &str, generation: u64) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); conn.execute( "INSERT INTO watches (url, note, created_gen) VALUES (?1, ?2, ?3)", params![url, note, generation], @@ -578,7 +638,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn watches(&self) -> Result, StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare( "SELECT id, url, note, last_hash, last_seen_gen, created_gen FROM watches ORDER BY id", )?; @@ -593,7 +654,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn watch_remove(&self, id: i64) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); Ok(conn.execute("DELETE FROM watches WHERE id = ?1", params![id])? > 0) } @@ -602,7 +664,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn watch_seen(&self, id: i64, hash: &str, generation: u64) -> Result<(), StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); conn.execute( "UPDATE watches SET last_hash = ?2, last_seen_gen = ?3 WHERE id = ?1", params![id, hash, generation], @@ -627,7 +690,8 @@ impl Store { generation: u64, ) -> Result<(), StoreError> { let args_json = serde_json::to_string(args).unwrap_or_else(|_| "[]".to_string()); - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); conn.execute( "INSERT INTO mcp_servers (name, command, args_json, note, created_gen) VALUES (?1, ?2, ?3, ?4, ?5) @@ -644,7 +708,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn mcp_servers(&self) -> Result, StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare( "SELECT name, command, args_json, note, created_gen FROM mcp_servers ORDER BY name", )?; @@ -659,7 +724,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn mcp_server(&self, name: &str) -> Result, StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let server = conn .query_row( "SELECT name, command, args_json, note, created_gen @@ -676,7 +742,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn mcp_unregister(&self, name: &str) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); Ok(conn.execute("DELETE FROM mcp_servers WHERE name = ?1", params![name])? > 0) } @@ -689,7 +756,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn card_add(&self, input: &CardInput, generation: u64) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let tx = conn.unchecked_transaction()?; let closed = lane_is_closed(&input.lane).then_some(generation); let existing: Option = tx @@ -765,7 +833,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn card_get(&self, id: i64) -> Result, StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let card = conn .query_row( &format!("SELECT {CARD_COLS} FROM cards WHERE rowid_id = ?1"), @@ -775,7 +844,7 @@ impl Store { .optional()?; match card { Some(mut c) => { - c.refs = load_refs(&conn, c.id)?; + c.refs = load_refs(conn, c.id)?; Ok(Some(c)) } None => Ok(None), @@ -787,7 +856,8 @@ impl Store { /// # Errors /// [`StoreError`] on SQLite failure. pub fn card_by_card_id(&self, card_id: &str) -> Result, StoreError> { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let card = conn .query_row( &format!("SELECT {CARD_COLS} FROM cards WHERE card_id = ?1"), @@ -797,7 +867,7 @@ impl Store { .optional()?; match card { Some(mut c) => { - c.refs = load_refs(&conn, c.id)?; + c.refs = load_refs(conn, c.id)?; Ok(Some(c)) } None => Ok(None), @@ -815,7 +885,8 @@ impl Store { input: &CardInput, generation: u64, ) -> Result { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let tx = conn.unchecked_transaction()?; let closed = lane_is_closed(&input.lane).then_some(generation); let changed = tx.execute( @@ -865,7 +936,8 @@ impl Store { generation: u64, ) -> Result { let closed = lane_is_closed(lane).then_some(generation); - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let changed = match context { Some(ctx) => conn.execute( "UPDATE cards SET lane=?2, context=?3, updated_gen=?4, closed_gen=?5 @@ -947,13 +1019,14 @@ impl Store { } sql.push_str(" ORDER BY rowid_id"); - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare(&sql)?; let mut cards = stmt .query_map(rusqlite::params_from_iter(args.iter()), row_to_card)? .collect::, _>>()?; for card in &mut cards { - card.refs = load_refs(&conn, card.id)?; + card.refs = load_refs(conn, card.id)?; } Ok(cards) } @@ -969,7 +1042,8 @@ impl Store { schema_version: SCHEMA_VERSION, last_generation: self.last_generation(), reminders: { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare( "SELECT id, text, due, recurrence, created_gen, done_gen FROM reminders ORDER BY id", )?; @@ -979,7 +1053,8 @@ impl Store { rows }, countdowns: { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare( "SELECT id, label, start_date, end_date, total_work_days, display, created_gen, retired_gen FROM countdowns ORDER BY id", @@ -990,7 +1065,8 @@ impl Store { rows }, watches: { - let conn = self.conn.lock().expect("store lock poisoned"); + let backend = self.backend.lock().expect("store lock poisoned"); + let conn = backend.connection(); let mut stmt = conn.prepare( "SELECT id, url, note, last_hash, last_seen_gen, created_gen FROM watches ORDER BY id", )?; @@ -1675,14 +1751,15 @@ mod tests { assert_eq!(v, 1); let store = Store { - conn: Mutex::new(conn), + backend: Mutex::new(SqliteBackend::from_connection(conn)), }; store.migrate().unwrap(); let v: i32 = store - .conn + .backend .lock() .unwrap() + .connection() .query_row("PRAGMA user_version", [], |r| r.get(0)) .unwrap(); assert_eq!(v, 2); @@ -1703,6 +1780,52 @@ mod tests { assert_eq!(store.cards_in_lane("p1", None).unwrap().len(), 1); } + #[test] + fn legacy_generation_is_migrated_into_substrate() { + // Regression: pre-substrate builds stored the engine generation as a + // string in modulex's own `meta` table. On first open with agent-store, + // that value must carry into the substrate counter and the legacy row + // must be dropped, so there is a single source of truth. + let conn = Connection::open_in_memory().unwrap(); + conn.execute_batch(MIGRATION_V1).unwrap(); + conn.execute( + "INSERT INTO meta (key, value) VALUES ('last_generation', '7')", + [], + ) + .unwrap(); + + let store = Store { + backend: Mutex::new(SqliteBackend::from_connection(conn)), + }; + store.migrate().unwrap(); + + assert_eq!(store.last_generation(), 7, "legacy generation carried over"); + + let backend = store.backend.lock().unwrap(); + let leftover: Option = backend + .connection() + .query_row( + "SELECT value FROM meta WHERE key = 'last_generation'", + [], + |r| r.get(0), + ) + .optional() + .unwrap(); + assert_eq!(leftover, None, "legacy row dropped after migration"); + } + + #[test] + fn policy_rejects_reserved_postgres_backend() { + // Regression: selecting the reserved postgres backend must fail loudly + // (the caller turns this into a soft "store unavailable" skip), never + // silently fall back to a different store. + let policy = agent_store::StorePolicy { + backend: agent_store::BackendKind::Postgres, + }; + let result = Store::open_with_policy(&policy, std::path::Path::new("/tmp/unused.db")); + assert!(matches!(result, Err(StoreError::Substrate(_)))); + } + #[test] fn cards_export_import_round_trips() { let a = Store::in_memory().unwrap();