diff --git a/omem-server/src/api/handlers/imports.rs b/omem-server/src/api/handlers/imports.rs index ffd3804..8b513bf 100644 --- a/omem-server/src/api/handlers/imports.rs +++ b/omem-server/src/api/handlers/imports.rs @@ -365,7 +365,9 @@ pub async fn cross_reconcile( }; // limit=6 to account for self appearing in results - let similar = store.vector_search(query_vec, 6, 0.85, None, None).await?; + let similar = store + .vector_search(query_vec, 6, 0.85, None, None, false) + .await?; for (candidate, score) in &similar { if candidate.id == memory.id { diff --git a/omem-server/src/api/handlers/memory.rs b/omem-server/src/api/handlers/memory.rs index c974758..9a3914f 100644 --- a/omem-server/src/api/handlers/memory.rs +++ b/omem-server/src/api/handlers/memory.rs @@ -38,6 +38,11 @@ pub struct CreateMemoryBody { pub tags: Option>, pub source: Option, pub memory_type: Option, + /// IDs of memories to mark superseded by this one. Used when consolidating + /// fragmented memories (e.g., chunked old-embedder content) into a single + /// new memory in one atomic call. + #[serde(default)] + pub replaces: Option>, } #[derive(Deserialize)] @@ -61,6 +66,8 @@ pub struct SearchQuery { pub agent_id: Option, #[serde(default)] pub check_stale: bool, + #[serde(default)] + pub include_superseded: bool, } fn default_limit() -> usize { @@ -82,6 +89,8 @@ pub struct ListQuery { pub sort: String, #[serde(default = "default_order")] pub order: String, + #[serde(default)] + pub include_superseded: bool, } fn default_sort() -> String { @@ -228,7 +237,16 @@ pub async fn create_memory( .map_err(|e| OmemError::Embedding(format!("failed to embed content: {e}")))?; let vector = vectors.into_iter().next(); - store.create(&memory, vector.as_deref()).await?; + match body.replaces.as_deref() { + Some(ids) if !ids.is_empty() => { + store + .supersede_batch(&memory, vector.as_deref(), ids) + .await?; + } + _ => { + store.create(&memory, vector.as_deref()).await?; + } + } // Fire-and-forget: check auto-share rules for the newly created memory { @@ -303,6 +321,7 @@ pub async fn search_memories( .map(|t| t.split(',').map(|s| s.trim().to_string()).collect()), source_filter: params.source.clone(), agent_id_filter: params.agent_id.clone(), + include_superseded: params.include_superseded, }; let retrieval_pipeline = RetrievalPipeline::new(store); @@ -364,6 +383,7 @@ pub async fn search_memories( }); let source_filter = params.source.clone(); let agent_id_filter = params.agent_id.clone(); + let include_superseded = params.include_superseded; let store = acc.store.clone(); let space_id = acc.space_id.clone(); let weight = acc.weight; @@ -380,6 +400,7 @@ pub async fn search_memories( tags_filter, source_filter, agent_id_filter, + include_superseded, }; let pipeline = RetrievalPipeline::new(store); let result = pipeline.search(&request).await; @@ -623,6 +644,7 @@ pub async fn list_memories( .map(|t| t.split(',').map(|s| s.trim().to_string()).collect()), memory_type: params.memory_type, state: params.state, + include_superseded: params.include_superseded, sort: params.sort, order: params.order, }; diff --git a/omem-server/src/api/handlers/sharing.rs b/omem-server/src/api/handlers/sharing.rs index cbb7c79..c2d2865 100644 --- a/omem-server/src/api/handlers/sharing.rs +++ b/omem-server/src/api/handlers/sharing.rs @@ -1591,7 +1591,7 @@ mod tests { target_store.create(©, None).await.expect("batch share"); } - let team_list = target_store.list(100, 0).await.expect("list"); + let team_list = target_store.list(100, 0, false).await.expect("list"); assert_eq!(team_list.len(), 3); } @@ -1638,7 +1638,7 @@ mod tests { .get_store("team:backend") .await .expect("team store"); - let team_list = team_store.list(100, 0).await.expect("list"); + let team_list = team_store.list(100, 0, false).await.expect("list"); assert_eq!(team_list.len(), 1); assert_eq!(team_list[0].content, "prefers vim keybindings"); } @@ -1907,7 +1907,7 @@ mod tests { .expect("share single"); } - let team_list = target_store.list(100, 0).await.expect("list"); + let team_list = target_store.list(100, 0, false).await.expect("list"); assert_eq!(team_list.len(), 1); assert!(team_list[0].content.contains("dark mode")); } diff --git a/omem-server/src/api/mod.rs b/omem-server/src/api/mod.rs index 1cb5d98..fc67b41 100644 --- a/omem-server/src/api/mod.rs +++ b/omem-server/src/api/mod.rs @@ -657,6 +657,211 @@ mod tests { assert_eq!(update_resp.status(), StatusCode::BAD_REQUEST); } + async fn create_test_memory(app: &axum::Router, api_key: &str, content: &str) -> String { + let body = format!(r#"{{"content":"{content}"}}"#); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/memories") + .header("content-type", "application/json") + .header("x-api-key", api_key) + .body(Body::from(body)) + .expect("request"), + ) + .await + .expect("response"); + let bytes = resp.into_body().collect().await.expect("body").to_bytes(); + let v: serde_json::Value = serde_json::from_slice(&bytes).expect("json"); + v["id"].as_str().expect("id").to_string() + } + + #[tokio::test] + async fn test_create_with_replaces_supersedes_old() { + let (app, _dir) = setup_app().await; + let api_key = create_test_tenant(&app).await; + + let old1 = create_test_memory(&app, &api_key, "fragment one").await; + let old2 = create_test_memory(&app, &api_key, "fragment two").await; + + let body = format!(r#"{{"content":"consolidated","replaces":["{old1}","{old2}"]}}"#); + let create_resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/memories") + .header("content-type", "application/json") + .header("x-api-key", &api_key) + .body(Body::from(body)) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(create_resp.status(), StatusCode::CREATED); + + let bytes = create_resp + .into_body() + .collect() + .await + .expect("body") + .to_bytes(); + let new_mem: serde_json::Value = serde_json::from_slice(&bytes).expect("json"); + let new_id = new_mem["id"].as_str().expect("id"); + + // Old memories should now be in superseded state via direct fetch. + for old_id in [&old1, &old2] { + let get_resp = app + .clone() + .oneshot( + Request::builder() + .uri(format!("/v1/memories/{old_id}")) + .header("x-api-key", &api_key) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(get_resp.status(), StatusCode::OK); + let bytes = get_resp + .into_body() + .collect() + .await + .expect("body") + .to_bytes(); + let old: serde_json::Value = serde_json::from_slice(&bytes).expect("json"); + assert_eq!(old["state"], "superseded"); + assert_eq!(old["superseded_by"], new_id); + } + } + + #[tokio::test] + async fn test_create_with_missing_replaces_returns_400() { + let (app, _dir) = setup_app().await; + let api_key = create_test_tenant(&app).await; + + let real_id = create_test_memory(&app, &api_key, "real one").await; + + // One real id, one ghost id. + let body = + format!(r#"{{"content":"consolidated","replaces":["{real_id}","ghost-id-nope"]}}"#); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/memories") + .header("content-type", "application/json") + .header("x-api-key", &api_key) + .body(Body::from(body)) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + + let bytes = resp.into_body().collect().await.expect("body").to_bytes(); + let err: serde_json::Value = serde_json::from_slice(&bytes).expect("json"); + let msg = err["error"]["message"].as_str().unwrap_or_default(); + assert!( + msg.contains("ghost-id-nope"), + "error should name the missing id, got: {msg}" + ); + + // The real memory must NOT have been touched. + let get_resp = app + .clone() + .oneshot( + Request::builder() + .uri(format!("/v1/memories/{real_id}")) + .header("x-api-key", &api_key) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + let bytes = get_resp + .into_body() + .collect() + .await + .expect("body") + .to_bytes(); + let v: serde_json::Value = serde_json::from_slice(&bytes).expect("json"); + assert_eq!(v["state"], "active"); + } + + #[tokio::test] + async fn test_search_excludes_superseded_by_default() { + let (app, _dir) = setup_app().await; + let api_key = create_test_tenant(&app).await; + + let old = create_test_memory(&app, &api_key, "fragment about rust programming").await; + let body = + format!(r#"{{"content":"unified rust programming notes","replaces":["{old}"]}}"#); + app.clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/memories") + .header("content-type", "application/json") + .header("x-api-key", &api_key) + .body(Body::from(body)) + .expect("request"), + ) + .await + .expect("response"); + + // Default search should NOT surface the superseded old fragment. + let resp = app + .clone() + .oneshot( + Request::builder() + .uri("/v1/memories/search?q=rust&limit=20") + .header("x-api-key", &api_key) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + let bytes = resp.into_body().collect().await.expect("body").to_bytes(); + let r: serde_json::Value = serde_json::from_slice(&bytes).expect("json"); + let results = r["results"].as_array().expect("results"); + for res in results { + let id = res["memory"]["id"].as_str().unwrap_or_default(); + assert_ne!(id, old.as_str(), "superseded memory should not appear"); + } + + // With include_superseded=true, the old should resurface. + let resp_inc = app + .clone() + .oneshot( + Request::builder() + .uri("/v1/memories/search?q=rust&limit=20&include_superseded=true") + .header("x-api-key", &api_key) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + let bytes = resp_inc + .into_body() + .collect() + .await + .expect("body") + .to_bytes(); + let r: serde_json::Value = serde_json::from_slice(&bytes).expect("json"); + let results = r["results"].as_array().expect("results"); + let ids: Vec<&str> = results + .iter() + .map(|res| res["memory"]["id"].as_str().unwrap_or_default()) + .collect(); + assert!( + ids.contains(&old.as_str()), + "old should resurface with include_superseded=true, got ids: {ids:?}" + ); + } + #[tokio::test] async fn test_search_memories() { let (app, _dir) = setup_app().await; @@ -1327,7 +1532,10 @@ mod tests { .oneshot( Request::builder() .method("POST") - .uri(format!("/v1/spaces/{}/members", utf8_percent_encode(&space_id, NON_ALPHANUMERIC))) + .uri(format!( + "/v1/spaces/{}/members", + utf8_percent_encode(&space_id, NON_ALPHANUMERIC) + )) .header("content-type", "application/json") .header("x-api-key", &api_key) .body(Body::from(r#"{"user_id":"bob","role":"member"}"#)) @@ -1386,7 +1594,10 @@ mod tests { .oneshot( Request::builder() .method("DELETE") - .uri(format!("/v1/spaces/{}/members/alice", utf8_percent_encode(&space_id, NON_ALPHANUMERIC))) + .uri(format!( + "/v1/spaces/{}/members/alice", + utf8_percent_encode(&space_id, NON_ALPHANUMERIC) + )) .header("x-api-key", &api_key) .body(Body::empty()) .expect("request"), @@ -1518,7 +1729,10 @@ mod tests { .oneshot( Request::builder() .method("DELETE") - .uri(format!("/v1/spaces/{}", utf8_percent_encode(&space_id, NON_ALPHANUMERIC))) + .uri(format!( + "/v1/spaces/{}", + utf8_percent_encode(&space_id, NON_ALPHANUMERIC) + )) .header("x-api-key", &api_key) .body(Body::empty()) .expect("request"), @@ -1532,7 +1746,10 @@ mod tests { .clone() .oneshot( Request::builder() - .uri(format!("/v1/spaces/{}", utf8_percent_encode(&space_id, NON_ALPHANUMERIC))) + .uri(format!( + "/v1/spaces/{}", + utf8_percent_encode(&space_id, NON_ALPHANUMERIC) + )) .header("x-api-key", &api_key) .body(Body::empty()) .expect("request"), @@ -1578,7 +1795,10 @@ mod tests { .oneshot( Request::builder() .method("PUT") - .uri(format!("/v1/spaces/{}/members/carol", utf8_percent_encode(&space_id, NON_ALPHANUMERIC))) + .uri(format!( + "/v1/spaces/{}/members/carol", + utf8_percent_encode(&space_id, NON_ALPHANUMERIC) + )) .header("content-type", "application/json") .header("x-api-key", &api_key) .body(Body::from(r#"{"role":"admin"}"#)) diff --git a/omem-server/src/domain/types.rs b/omem-server/src/domain/types.rs index 7fa1ab0..bf85eae 100644 --- a/omem-server/src/domain/types.rs +++ b/omem-server/src/domain/types.rs @@ -45,6 +45,7 @@ pub enum MemoryState { Active, Archived, Deleted, + Superseded, } impl fmt::Display for MemoryState { @@ -53,6 +54,7 @@ impl fmt::Display for MemoryState { Self::Active => write!(f, "active"), Self::Archived => write!(f, "archived"), Self::Deleted => write!(f, "deleted"), + Self::Superseded => write!(f, "superseded"), } } } @@ -65,6 +67,7 @@ impl FromStr for MemoryState { "active" => Ok(Self::Active), "archived" => Ok(Self::Archived), "deleted" => Ok(Self::Deleted), + "superseded" => Ok(Self::Superseded), _ => Err(format!("unknown memory state: {s}")), } } diff --git a/omem-server/src/ingest/admission.rs b/omem-server/src/ingest/admission.rs index a01fe00..a1e77a8 100644 --- a/omem-server/src/ingest/admission.rs +++ b/omem-server/src/ingest/admission.rs @@ -96,7 +96,7 @@ impl AdmissionControl { let search_results = self .store - .vector_search(&query_vec, 5, 0.0, None, None) + .vector_search(&query_vec, 5, 0.0, None, None, false) .await .unwrap_or_default(); diff --git a/omem-server/src/ingest/reconciler.rs b/omem-server/src/ingest/reconciler.rs index fd08d1b..aea0100 100644 --- a/omem-server/src/ingest/reconciler.rs +++ b/omem-server/src/ingest/reconciler.rs @@ -442,6 +442,7 @@ impl Reconciler { self.min_similarity, None, None, + false, ) .await { @@ -474,7 +475,7 @@ impl Reconciler { match self .store - .fts_search(&fts_query, self.max_per_fact, None, None) + .fts_search(&fts_query, self.max_per_fact, None, None, false) .await { Ok(results) => { diff --git a/omem-server/src/lifecycle/forgetting.rs b/omem-server/src/lifecycle/forgetting.rs index a227abf..63459b7 100644 --- a/omem-server/src/lifecycle/forgetting.rs +++ b/omem-server/src/lifecycle/forgetting.rs @@ -34,7 +34,7 @@ impl AutoForgetter { } pub async fn cleanup_expired(&self) -> Result { - let memories = self.store.list(10000, 0).await?; + let memories = self.store.list(10000, 0, false).await?; let now = chrono::Utc::now(); let mut deleted_count = 0; @@ -53,7 +53,7 @@ impl AutoForgetter { } pub async fn archive_superseded(&self, max_age_days: u32) -> Result { - let memories = self.store.list(10000, 0).await?; + let memories = self.store.list(10000, 0, false).await?; let now = chrono::Utc::now(); let max_age = chrono::TimeDelta::try_days(max_age_days as i64) .unwrap_or_else(chrono::TimeDelta::zero); @@ -183,7 +183,7 @@ mod tests { "only the expired 'today' memory (5 days old) should be deleted" ); - let remaining = store.list(100, 0).await.expect("list"); + let remaining = store.list(100, 0, false).await.expect("list"); assert_eq!(remaining.len(), 2); } diff --git a/omem-server/src/profile/service.rs b/omem-server/src/profile/service.rs index f3e2bb7..02936c1 100644 --- a/omem-server/src/profile/service.rs +++ b/omem-server/src/profile/service.rs @@ -25,7 +25,7 @@ impl ProfileService { } pub async fn get_profile(&self, query: Option<&str>) -> Result { - let all_memories = self.store.list(200, 0).await?; + let all_memories = self.store.list(200, 0, false).await?; let mut static_memories: Vec<_> = all_memories .iter() @@ -71,7 +71,7 @@ impl ProfileService { Some(q) => { let results = self .store - .fts_search(q, 10, None, None) + .fts_search(q, 10, None, None, false) .await .unwrap_or_default(); Some( diff --git a/omem-server/src/retrieve/pipeline.rs b/omem-server/src/retrieve/pipeline.rs index 137d0c2..459ad31 100644 --- a/omem-server/src/retrieve/pipeline.rs +++ b/omem-server/src/retrieve/pipeline.rs @@ -23,6 +23,10 @@ pub struct SearchRequest { pub tags_filter: Option>, pub source_filter: Option, pub agent_id_filter: Option, + /// When true, surface memories whose `state` is `superseded` alongside + /// active ones. Default (false) hides them so consumers don't see the + /// stale half of a replace-and-consolidate operation. + pub include_superseded: bool, } #[derive(Debug, Clone)] @@ -188,10 +192,12 @@ impl RetrievalPipeline { let stage_start = Instant::now(); let scope = request.scope_filter.as_deref(); + let include_superseded = request.include_superseded; + let vector_fut = async { if let Some(ref qv) = request.query_vector { self.store - .vector_search(qv, fetch_limit, 0.0, scope, None) + .vector_search(qv, fetch_limit, 0.0, scope, None, include_superseded) .await } else { Ok(Vec::new()) @@ -200,7 +206,7 @@ impl RetrievalPipeline { let bm25_fut = async { self.store - .fts_search(&request.query, fetch_limit, scope, None) + .fts_search(&request.query, fetch_limit, scope, None, include_superseded) .await }; @@ -746,6 +752,7 @@ mod tests { tags_filter: None, source_filter: None, agent_id_filter: None, + include_superseded: false, }; let results = pipeline.search(&request).await.expect("search"); @@ -809,6 +816,7 @@ mod tests { tags_filter: None, source_filter: None, agent_id_filter: None, + include_superseded: false, }; let results = pipeline.search(&request).await.expect("search"); @@ -886,6 +894,7 @@ mod tests { tags_filter: None, source_filter: None, agent_id_filter: None, + include_superseded: false, }; let results = pipeline @@ -920,6 +929,7 @@ mod tests { tags_filter: None, source_filter: None, agent_id_filter: None, + include_superseded: false, }; let results = pipeline.search(&request).await.expect("search"); @@ -962,6 +972,7 @@ mod tests { tags_filter: None, source_filter: None, agent_id_filter: None, + include_superseded: false, }; let results = pipeline @@ -1184,7 +1195,10 @@ mod tests { ]; let (result, _) = RetrievalPipeline::stage_rrf_normalize(entries); - let top = result.iter().find(|e| e.memory.content == "weak-top").unwrap(); + let top = result + .iter() + .find(|e| e.memory.content == "weak-top") + .unwrap(); assert!( top.rrf_score < 0.25, "weak top result should stay below 0.25, got {}", diff --git a/omem-server/src/store/lancedb.rs b/omem-server/src/store/lancedb.rs index 9eb9ee8..5e3cd33 100644 --- a/omem-server/src/store/lancedb.rs +++ b/omem-server/src/store/lancedb.rs @@ -24,12 +24,29 @@ use crate::domain::types::{MemoryState, MemoryType, Tier}; pub const DEFAULT_VECTOR_DIM: i32 = 1024; const TABLE_NAME: &str = "memories"; +/// Default state filter for search/list operations. +/// Excludes deleted (soft-deleted) and superseded (replaced by another memory). +const DEFAULT_STATE_FILTER: &str = "state NOT IN ('deleted', 'superseded')"; + +/// State filter that includes superseded memories. +/// Use when an explicit caller wants to see historical/replaced entries. +const STATE_FILTER_KEEPING_SUPERSEDED: &str = "state != 'deleted'"; + +fn state_filter(include_superseded: bool) -> &'static str { + if include_superseded { + STATE_FILTER_KEEPING_SUPERSEDED + } else { + DEFAULT_STATE_FILTER + } +} + pub struct ListFilter { pub category: Option, pub tier: Option, pub tags: Option>, pub memory_type: Option, pub state: Option, + pub include_superseded: bool, pub sort: String, pub order: String, } @@ -42,6 +59,7 @@ impl Default for ListFilter { tags: None, memory_type: None, state: None, + include_superseded: false, sort: "created_at".to_string(), order: "desc".to_string(), } @@ -458,11 +476,13 @@ impl LanceStore { 0.0 } + /// Lists all memories that are neither deleted nor superseded. + /// Internal use; external lists should go through `list` or `list_filtered`. pub async fn list_all_active(&self) -> Result, OmemError> { let table = self.open_table().await?; let batches: Vec = table .query() - .only_if("state != 'deleted'") + .only_if(DEFAULT_STATE_FILTER) .execute() .await .map_err(|e| OmemError::Storage(format!("list all query failed: {e}")))? @@ -519,7 +539,11 @@ impl LanceStore { let table = self.open_table().await?; let batches: Vec = table .query() - .only_if(format!("id = '{}' AND state != 'deleted'", escape_sql(id))) + .only_if(format!( + "id = '{}' AND {}", + escape_sql(id), + DEFAULT_STATE_FILTER + )) .limit(1) .execute() .await @@ -570,6 +594,83 @@ impl LanceStore { Ok(()) } + /// Atomically replace `old_ids` with a new memory. + /// + /// Semantics: + /// 1. Validate that every old_id exists and is not already superseded. + /// If any fail, return `Err(OmemError::Validation(...))` listing them; + /// no writes happen. + /// 2. Insert the new memory (with its vector). + /// 3. For each old, set `state = Superseded`, `superseded_by = new.id`, + /// `invalidated_at = now`. + /// + /// Lance has no native multi-row transactions, so step 3 is best-effort + /// sequential. If a per-old update fails, the new memory remains + /// (consolidated content is preserved) but the chain is partial — the + /// surfaced error names the IDs that failed so callers can retry. + pub async fn supersede_batch( + &self, + new: &Memory, + new_vector: Option<&[f32]>, + old_ids: &[String], + ) -> Result<(), OmemError> { + if old_ids.is_empty() { + return self.create(new, new_vector).await; + } + + let mut missing = Vec::new(); + let mut already = Vec::new(); + let mut olds: Vec = Vec::with_capacity(old_ids.len()); + for id in old_ids { + match self.get_by_id(id).await? { + None => missing.push(id.clone()), + Some(m) => { + if matches!(m.state, MemoryState::Superseded) { + already.push(id.clone()); + } else { + olds.push(m); + } + } + } + } + if !missing.is_empty() || !already.is_empty() { + let mut parts = Vec::new(); + if !missing.is_empty() { + parts.push(format!("missing: [{}]", missing.join(", "))); + } + if !already.is_empty() { + parts.push(format!("already superseded: [{}]", already.join(", "))); + } + return Err(OmemError::Validation(format!( + "supersede precheck failed — {}", + parts.join("; ") + ))); + } + + self.create(new, new_vector).await?; + + let now = chrono::Utc::now().to_rfc3339(); + let mut update_failures = Vec::new(); + for mut m in olds { + m.state = MemoryState::Superseded; + m.superseded_by = Some(new.id.clone()); + m.invalidated_at = Some(now.clone()); + m.updated_at = now.clone(); + if let Err(e) = self.update(&m, None).await { + update_failures.push(format!("{}: {e}", m.id)); + } + } + if !update_failures.is_empty() { + return Err(OmemError::Storage(format!( + "new memory {} created, but failed to mark superseded: [{}]", + new.id, + update_failures.join("; ") + ))); + } + + Ok(()) + } + pub async fn soft_delete(&self, id: &str) -> Result<(), OmemError> { let memory = self .get_by_id(id) @@ -582,11 +683,16 @@ impl LanceStore { self.update(&updated, None).await } - pub async fn list(&self, limit: usize, offset: usize) -> Result, OmemError> { + pub async fn list( + &self, + limit: usize, + offset: usize, + include_superseded: bool, + ) -> Result, OmemError> { let table = self.open_table().await?; let batches: Vec = table .query() - .only_if("state != 'deleted'") + .only_if(state_filter(include_superseded)) .limit(limit + offset) .execute() .await @@ -606,6 +712,7 @@ impl LanceStore { min_score: f32, scope_filter: Option<&str>, visibility_filter: Option<&str>, + include_superseded: bool, ) -> Result, OmemError> { let table = self.open_table().await?; let mut query = table @@ -615,7 +722,7 @@ impl LanceStore { query = query.limit(limit); - let mut filter = "state != 'deleted'".to_string(); + let mut filter = state_filter(include_superseded).to_string(); if let Some(scope) = scope_filter { filter.push_str(&format!(" AND scope = '{}'", escape_sql(scope))); } @@ -651,6 +758,7 @@ impl LanceStore { limit: usize, scope_filter: Option<&str>, visibility_filter: Option<&str>, + include_superseded: bool, ) -> Result, OmemError> { let table = self.open_table().await?; @@ -662,7 +770,7 @@ impl LanceStore { .select(Select::All) .limit(limit); - let mut filter = "state != 'deleted'".to_string(); + let mut filter = state_filter(include_superseded).to_string(); if let Some(scope) = scope_filter { filter.push_str(&format!(" AND scope = '{}'", escape_sql(scope))); } @@ -691,7 +799,7 @@ impl LanceStore { } pub fn build_visibility_filter(&self, agent_id: &str, accessible_spaces: &[String]) -> String { - let mut conditions = vec!["state != 'deleted'".to_string()]; + let mut conditions = vec![DEFAULT_STATE_FILTER.to_string()]; let mut vis_conditions = vec!["visibility = 'global'".to_string()]; @@ -816,7 +924,8 @@ impl LanceStore { } let filter = format!( - "state != 'deleted' AND provenance_source_id = '{}'", + "{} AND provenance_source_id = '{}'", + DEFAULT_STATE_FILTER, escape_sql(source_memory_id) ); let batches: Vec = table @@ -834,7 +943,9 @@ impl LanceStore { pub async fn batch_soft_delete(&self, filter: &str) -> Result { let table = self.open_table().await?; - let full_filter = format!("{} AND state != 'deleted'", filter); + // Allow batch deletion of already-superseded memories too (cleanup paths + // may want to garbage-collect old replaced fragments). + let full_filter = format!("{} AND {}", filter, STATE_FILTER_KEEPING_SUPERSEDED); let batches: Vec = table .query() .only_if(&full_filter) @@ -856,7 +967,7 @@ impl LanceStore { pub async fn count_by_filter(&self, filter: &str) -> Result { let table = self.open_table().await?; - let full_filter = format!("{} AND state != 'deleted'", filter); + let full_filter = format!("{} AND {}", filter, DEFAULT_STATE_FILTER); let count = table .count_rows(Some(full_filter)) .await @@ -878,7 +989,7 @@ impl LanceStore { match &filter.state { Some(s) => conditions.push(format!("state = '{}'", escape_sql(s))), - None => conditions.push("state != 'deleted'".to_string()), + None => conditions.push(state_filter(filter.include_superseded).to_string()), } if let Some(ref cat) = filter.category { @@ -1040,7 +1151,7 @@ mod tests { query_vec[0] = 1.0; let results = store - .vector_search(&query_vec, 3, 0.0, None, None) + .vector_search(&query_vec, 3, 0.0, None, None, false) .await .unwrap(); @@ -1066,7 +1177,7 @@ mod tests { store.create_fts_index().await.unwrap(); let results = store - .fts_search("programming language", 10, None, None) + .fts_search("programming language", 10, None, None, false) .await .unwrap(); @@ -1102,13 +1213,13 @@ mod tests { store.create(&mem, None).await.unwrap(); } - let page1 = store.list(2, 0).await.unwrap(); + let page1 = store.list(2, 0, false).await.unwrap(); assert_eq!(page1.len(), 2); - let page2 = store.list(2, 2).await.unwrap(); + let page2 = store.list(2, 2, false).await.unwrap(); assert_eq!(page2.len(), 2); - let page3 = store.list(2, 4).await.unwrap(); + let page3 = store.list(2, 4, false).await.unwrap(); assert_eq!(page3.len(), 1); } @@ -1128,11 +1239,11 @@ mod tests { store_a.create(&mem_a, Some(&va)).await.unwrap(); store_b.create(&mem_b, Some(&vb)).await.unwrap(); - let list_a = store_a.list(100, 0).await.unwrap(); + let list_a = store_a.list(100, 0, false).await.unwrap(); assert_eq!(list_a.len(), 1); assert_eq!(list_a[0].tenant_id, "tenant_A"); - let list_b = store_b.list(100, 0).await.unwrap(); + let list_b = store_b.list(100, 0, false).await.unwrap(); assert_eq!(list_b.len(), 1); assert_eq!(list_b[0].tenant_id, "tenant_B"); } @@ -1255,7 +1366,7 @@ mod tests { }); let result = store.build_visibility_filter("", &[]); assert!(result.contains("visibility = 'global'")); - assert!(result.contains("state != 'deleted'")); + assert!(result.contains("state NOT IN ('deleted', 'superseded')")); assert!(!result.contains("private")); } @@ -1376,4 +1487,136 @@ mod tests { let result = store.find_by_provenance_source("some-id").await.unwrap(); assert!(result.is_empty()); } + + #[tokio::test] + async fn test_supersede_batch_marks_old_as_superseded() { + let (store, _dir) = setup().await; + let v = vec![0.1f32; DEFAULT_VECTOR_DIM as usize]; + + let old1 = make_memory("t-001", "fragment 1 of 3"); + let old2 = make_memory("t-001", "fragment 2 of 3"); + let old3 = make_memory("t-001", "fragment 3 of 3"); + store.create(&old1, Some(&v)).await.unwrap(); + store.create(&old2, Some(&v)).await.unwrap(); + store.create(&old3, Some(&v)).await.unwrap(); + + let new = make_memory("t-001", "consolidated content"); + let old_ids = vec![old1.id.clone(), old2.id.clone(), old3.id.clone()]; + store + .supersede_batch(&new, Some(&v), &old_ids) + .await + .expect("supersede should succeed"); + + for id in &old_ids { + let fetched = store + .get_by_id(id) + .await + .unwrap() + .expect("old still exists"); + assert!(matches!(fetched.state, MemoryState::Superseded)); + assert_eq!(fetched.superseded_by.as_deref(), Some(new.id.as_str())); + assert!(fetched.invalidated_at.is_some()); + } + + let new_fetched = store.get_by_id(&new.id).await.unwrap().expect("new exists"); + assert!(matches!(new_fetched.state, MemoryState::Active)); + } + + #[tokio::test] + async fn test_supersede_batch_rejects_missing_id() { + let (store, _dir) = setup().await; + let v = vec![0.1f32; DEFAULT_VECTOR_DIM as usize]; + + let real = make_memory("t-001", "existing memory"); + store.create(&real, Some(&v)).await.unwrap(); + + let new = make_memory("t-001", "consolidated"); + let old_ids = vec![real.id.clone(), "ghost-id-does-not-exist".to_string()]; + let err = store + .supersede_batch(&new, Some(&v), &old_ids) + .await + .expect_err("missing id should reject"); + + let msg = format!("{err:?}"); + assert!( + msg.contains("missing"), + "error should mention missing: {msg}" + ); + assert!( + msg.contains("ghost-id-does-not-exist"), + "error should list the ghost id: {msg}" + ); + + // No write happened — original memory unchanged, new memory not created. + let fetched = store + .get_by_id(&real.id) + .await + .unwrap() + .expect("still there"); + assert!(matches!(fetched.state, MemoryState::Active)); + assert!(store.get_by_id(&new.id).await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_supersede_batch_rejects_already_superseded() { + let (store, _dir) = setup().await; + let v = vec![0.1f32; DEFAULT_VECTOR_DIM as usize]; + + let old = make_memory("t-001", "original"); + store.create(&old, Some(&v)).await.unwrap(); + let first_new = make_memory("t-001", "first consolidation"); + store + .supersede_batch(&first_new, Some(&v), &[old.id.clone()]) + .await + .unwrap(); + + // Trying to supersede `old` again should reject. + let second_new = make_memory("t-001", "second attempt"); + let err = store + .supersede_batch(&second_new, Some(&v), &[old.id.clone()]) + .await + .expect_err("already-superseded should reject"); + + let msg = format!("{err:?}"); + assert!( + msg.contains("already superseded"), + "error should mention already-superseded: {msg}" + ); + assert!( + store.get_by_id(&second_new.id).await.unwrap().is_none(), + "second_new should NOT have been created" + ); + } + + #[tokio::test] + async fn test_default_state_filter_excludes_superseded() { + let (store, _dir) = setup().await; + let v = vec![0.1f32; DEFAULT_VECTOR_DIM as usize]; + + let old = make_memory("t-001", "to be superseded"); + let alive = make_memory("t-001", "still active"); + store.create(&old, Some(&v)).await.unwrap(); + store.create(&alive, Some(&v)).await.unwrap(); + let new = make_memory("t-001", "replacement"); + store + .supersede_batch(&new, Some(&v), &[old.id.clone()]) + .await + .unwrap(); + + // Default list excludes the superseded `old`. + let listed = store.list(100, 0, false).await.unwrap(); + let ids: Vec<&str> = listed.iter().map(|m| m.id.as_str()).collect(); + assert!(!ids.contains(&old.id.as_str()), "old should be hidden"); + assert!(ids.contains(&alive.id.as_str())); + assert!(ids.contains(&new.id.as_str())); + + // include_superseded=true surfaces it. + let listed_with = store.list(100, 0, true).await.unwrap(); + let ids_with: Vec<&str> = listed_with.iter().map(|m| m.id.as_str()).collect(); + assert!(ids_with.contains(&old.id.as_str())); + + // get_by_id always returns regardless of state (history preserved). + let direct = store.get_by_id(&old.id).await.unwrap(); + assert!(direct.is_some(), "get_by_id should still return superseded"); + } } diff --git a/omem-server/src/store/manager.rs b/omem-server/src/store/manager.rs index 0ccf8cc..efd4edf 100644 --- a/omem-server/src/store/manager.rs +++ b/omem-server/src/store/manager.rs @@ -221,11 +221,11 @@ mod tests { let mem_b = make_memory("tenant-B", "secret data for B"); store_b.create(&mem_b, None).await.expect("create in B"); - let list_a = store_a.list(100, 0).await.expect("list A"); + let list_a = store_a.list(100, 0, false).await.expect("list A"); assert_eq!(list_a.len(), 1); assert_eq!(list_a[0].content, "secret data for A"); - let list_b = store_b.list(100, 0).await.expect("list B"); + let list_b = store_b.list(100, 0, false).await.expect("list B"); assert_eq!(list_b.len(), 1); assert_eq!(list_b[0].content, "secret data for B"); } diff --git a/plugins/mcp/src/client.ts b/plugins/mcp/src/client.ts index 53ad446..22ab047 100644 --- a/plugins/mcp/src/client.ts +++ b/plugins/mcp/src/client.ts @@ -69,10 +69,13 @@ export class OmemClient { content: string, tags?: string[], source?: string, + replaces?: string[], ): Promise { + const body: Record = { content, tags, source }; + if (replaces && replaces.length > 0) body.replaces = replaces; const result = await this.request("/v1/memories", { method: "POST", - body: JSON.stringify({ content, tags, source }), + body: JSON.stringify(body), }); if (!result) throw new Error("Failed to create memory"); return result; diff --git a/plugins/mcp/src/tools.ts b/plugins/mcp/src/tools.ts index 0df5307..79fcec5 100644 --- a/plugins/mcp/src/tools.ts +++ b/plugins/mcp/src/tools.ts @@ -8,7 +8,7 @@ export function registerTools(server: McpServer, client: OmemClient): void { { title: "Store Memory", description: - "Store a new memory in omem. Use this to save important information, decisions, preferences, or context for future reference.", + "Store a new memory in omem. Use this to save important information, decisions, preferences, or context for future reference. Pass `replaces` to atomically supersede one or more existing memories with this new consolidated one (e.g. when merging chunked fragments into a single record).", inputSchema: { content: z.string().describe("The content to remember"), tags: z @@ -19,14 +19,21 @@ export function registerTools(server: McpServer, client: OmemClient): void { .string() .optional() .describe("Source identifier (e.g. 'chat', 'code-review')"), + replaces: z + .array(z.string()) + .optional() + .describe( + "Memory IDs to mark as superseded by this new one. Use when consolidating multiple fragments into a single memory. Default search hides superseded entries; get-by-id still returns them for history.", + ), }, }, - async ({ content, tags, source }) => { + async ({ content, tags, source, replaces }) => { try { const memory = await client.createMemory( content, tags ?? [], source ?? "mcp", + replaces, ); return { content: [