diff --git a/crates/data_connector/src/core.rs b/crates/data_connector/src/core.rs index 4426f3fd9..33b822697 100644 --- a/crates/data_connector/src/core.rs +++ b/crates/data_connector/src/core.rs @@ -188,6 +188,8 @@ pub struct ConversationItem { pub content: Value, pub status: Option, pub created_at: DateTime, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub item_json: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -199,6 +201,8 @@ pub struct NewConversationItem { pub role: Option, pub content: Value, pub status: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub item_json: Option, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] diff --git a/crates/data_connector/src/factory.rs b/crates/data_connector/src/factory.rs index 3407723e2..b8b4d3f53 100644 --- a/crates/data_connector/src/factory.rs +++ b/crates/data_connector/src/factory.rs @@ -299,6 +299,7 @@ mod tests { role: Some("user".to_string()), content: json!([]), status: Some("completed".to_string()), + item_json: None, }) .await .unwrap(); @@ -451,6 +452,7 @@ mod tests { role: Some("user".to_string()), content: json!([]), status: Some("completed".to_string()), + item_json: None, }) .await .unwrap(); diff --git a/crates/data_connector/src/hooked.rs b/crates/data_connector/src/hooked.rs index 8bb9cfe6b..c0acced61 100644 --- a/crates/data_connector/src/hooked.rs +++ b/crates/data_connector/src/hooked.rs @@ -808,6 +808,7 @@ mod tests { role: Some("user".to_string()), content: json!([]), status: Some("completed".to_string()), + item_json: None, }) .await .unwrap(); @@ -1401,6 +1402,7 @@ mod tests { role: Some("user".to_string()), content: json!("hello"), status: None, + item_json: None, }) .await .unwrap(); @@ -1474,6 +1476,7 @@ mod tests { role: Some("user".to_string()), content: json!("test"), status: None, + item_json: None, }) .await .unwrap(); diff --git a/crates/data_connector/src/memory.rs b/crates/data_connector/src/memory.rs index 9a3e8242d..1192ac965 100644 --- a/crates/data_connector/src/memory.rs +++ b/crates/data_connector/src/memory.rs @@ -123,6 +123,7 @@ impl ConversationItemStorage for MemoryConversationItemStorage { content: new_item.content, status: new_item.status, created_at, + item_json: None, }; self.inner.write().items.insert(id, item.clone()); Ok(item) @@ -564,6 +565,7 @@ mod tests { role: role.map(|r| r.to_string()), content, status: Some("completed".to_string()), + item_json: None, } } diff --git a/crates/data_connector/src/noop.rs b/crates/data_connector/src/noop.rs index eddfeadf8..80c0ef7c5 100644 --- a/crates/data_connector/src/noop.rs +++ b/crates/data_connector/src/noop.rs @@ -88,6 +88,7 @@ impl ConversationItemStorage for NoOpConversationItemStorage { content: item.content, status: item.status, created_at: Utc::now(), + item_json: None, }) } @@ -301,6 +302,7 @@ mod tests { role: role.map(|r| r.to_string()), content: json!([]), status: Some("completed".to_string()), + item_json: None, } } @@ -329,6 +331,7 @@ mod tests { role: Some("assistant".to_string()), content: json!(["hello"]), status: Some("completed".to_string()), + item_json: None, }; let item = store .create_item(input) diff --git a/crates/data_connector/src/oracle.rs b/crates/data_connector/src/oracle.rs index 4999a26e2..a560483c9 100644 --- a/crates/data_connector/src/oracle.rs +++ b/crates/data_connector/src/oracle.rs @@ -689,6 +689,7 @@ impl ConversationItemStorage for OracleConversationItemStorage { role, content, status, + item_json: _, } = item; let id = opt_id.unwrap_or_else(|| make_item_id(&item_type)); let created_at = Utc::now(); @@ -763,6 +764,7 @@ impl ConversationItemStorage for OracleConversationItemStorage { content, status, created_at, + item_json: None, }) } @@ -1147,6 +1149,7 @@ fn build_item_from_oracle_row( content, status, created_at, + item_json: None, }) } diff --git a/crates/data_connector/src/oracle_migrations.rs b/crates/data_connector/src/oracle_migrations.rs index 59656ace9..2cc4be5b8 100644 --- a/crates/data_connector/src/oracle_migrations.rs +++ b/crates/data_connector/src/oracle_migrations.rs @@ -62,15 +62,21 @@ const ORACLE_V11: Migration = Migration { description: "Create response_stream_chunks table", up: oracle_v11_up, }; +const ORACLE_V12: Migration = Migration { + version: 12, + description: "Add item_json column and link_id sequence for canonical payloads and ordering", + up: oracle_v12_up, +}; /// Core history-backend migrations required by the SQL response/conversation /// storage path during normal gateway startup. -pub(crate) static ORACLE_HISTORY_MIGRATIONS: [Migration; 3] = [ORACLE_V1, ORACLE_V2, ORACLE_V3]; +pub(crate) static ORACLE_HISTORY_MIGRATIONS: [Migration; 4] = + [ORACLE_V1, ORACLE_V2, ORACLE_V3, ORACLE_V12]; /// Oracle migration list. Append new migrations here. -pub(crate) static ORACLE_MIGRATIONS: [Migration; 11] = [ +pub(crate) static ORACLE_MIGRATIONS: [Migration; 12] = [ ORACLE_V1, ORACLE_V2, ORACLE_V3, ORACLE_V4, ORACLE_V5, ORACLE_V6, ORACLE_V7, ORACLE_V8, - ORACLE_V9, ORACLE_V10, ORACLE_V11, + ORACLE_V9, ORACLE_V10, ORACLE_V11, ORACLE_V12, ]; fn oracle_v1_up(schema: &SchemaConfig) -> Vec { @@ -449,6 +455,65 @@ fn oracle_v11_up(schema: &SchemaConfig) -> Vec { ] } +/// Add `item_json` column to `conversation_items` and `link_id` column + +/// monotonic sequence to `conversation_item_links`. +/// +/// `item_json` (CLOB with `IS JSON` check) holds the whole canonical OpenAI +/// item payload so reads can return the correct shape without per-type +/// reconstruction. `link_id` (NUMBER, populated via `CONV_ITEM_LINK_ID_SEQ`) +/// is a strictly increasing per-link key that gives `/v1/conversations/{id}/items` +/// deterministic ordering (replacing `added_at, item_id` which is not unique). +/// +/// Both columns are added as nullable so the migration is non-blocking on +/// large tables. +fn oracle_v12_up(schema: &SchemaConfig) -> Vec { + let si = &schema.conversation_items; + let sl = &schema.conversation_item_links; + let si_table = si.qualified_table(schema.owner.as_deref()); + let sl_table = sl.qualified_table(schema.owner.as_deref()); + + let mut stmts: Vec = Vec::new(); + + if !si.is_skipped("item_json") { + let item_json_col = si.col("item_json").to_uppercase(); + // CLOB + `IS JSON` check ensures only valid JSON is accepted + // (Oracle < 21c has no native JSON type). + // ORA-01430 = "column being added already exists in table". + stmts.push(format!( + "BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {si_table} ADD ({item_json_col} CLOB CHECK ({item_json_col} IS JSON))'; \ + EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;" + )); + } + + if !sl.is_skipped("link_id") { + let link_id_col = sl.col("link_id").to_uppercase(); + let seq = oracle_qualified_name(schema, "CONV_ITEM_LINK_ID_SEQ"); + + // ORA-00955 = "name is already used by an existing object". + // ORDER (vs NOORDER) makes the sequence strictly monotonic in request + // order across RAC / multi-instance deployments at the cost of a + // cross-instance lock; on single-instance Oracle (e.g. ATP serverless) + // it is a no-op. Required for the deterministic ordering goal that + // LINK_ID backs. + stmts.push(format!( + "BEGIN EXECUTE IMMEDIATE 'CREATE SEQUENCE {seq} START WITH 1 INCREMENT BY 1 NOCACHE ORDER'; \ + EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;" + )); + stmts.push(format!( + "BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {sl_table} ADD ({link_id_col} NUMBER)'; \ + EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;" + )); + // The UNIQUE INDEX on (conversation_id, link_id) is intentionally NOT + // created here. Creating it while LINK_ID is universally NULL caused + // every link_item INSERT past the first per-conversation to fail in + // e2e (only the first input message survived). It will be added in + // PR 3 immediately after backfilling LINK_ID with sequence values + // and promoting the column to NOT NULL. + } + + stmts +} + // ── Tests ────────────────────────────────────────────────────────────────── #[cfg(test)] @@ -462,7 +527,7 @@ mod tests { .iter() .map(|migration| migration.version) .collect(); - assert_eq!(versions, vec![1, 2, 3]); + assert_eq!(versions, vec![1, 2, 3, 12]); } #[test] @@ -680,6 +745,42 @@ mod tests { ); } + // ── v12: add item_json column and link_id sequence ───────────────────── + + #[test] + fn oracle_v12_up_adds_item_json_column_and_link_id_sequence() { + let schema = SchemaConfig { + owner: Some("OWNER".to_string()), + ..Default::default() + }; + let stmts = oracle_v12_up(&schema); + assert_eq!(stmts.len(), 3, "got: {stmts:?}"); + assert!(stmts[0].contains("ADD (ITEM_JSON CLOB") && stmts[0].contains("IS JSON")); + assert!(stmts[1].contains("CREATE SEQUENCE OWNER.CONV_ITEM_LINK_ID_SEQ")); + // Leading space distinguishes ORDER from NOORDER. Required for + // strictly monotonic NEXTVAL on RAC / multi-instance deployments. + assert!( + stmts[1].contains(" ORDER"), + "sequence must use ORDER for RAC determinism: {}", + stmts[1] + ); + assert!(stmts[2].contains("ADD (LINK_ID NUMBER)")); + } + + #[test] + fn oracle_v12_up_respects_skip_columns() { + let mut schema = SchemaConfig::default(); + schema + .conversation_items + .skip_columns + .insert("item_json".to_string()); + schema + .conversation_item_links + .skip_columns + .insert("link_id".to_string()); + assert!(oracle_v12_up(&schema).is_empty()); + } + #[test] fn oracle_v2_up_generates_plsql_drop_column() { let schema = SchemaConfig::default(); diff --git a/crates/data_connector/src/postgres.rs b/crates/data_connector/src/postgres.rs index d8d6f3aec..cb50e0944 100644 --- a/crates/data_connector/src/postgres.rs +++ b/crates/data_connector/src/postgres.rs @@ -435,6 +435,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage { role, content, status, + item_json: _, } = item; let id = opt_id.unwrap_or_else(|| make_item_id(&item_type)); let created_at = Utc::now(); @@ -506,6 +507,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage { content, status, created_at, + item_json: None, }) } @@ -817,6 +819,7 @@ fn build_item_from_row( content, status, created_at, + item_json: None, }) } diff --git a/crates/data_connector/src/postgres_migrations.rs b/crates/data_connector/src/postgres_migrations.rs index 077ff50e0..6da0c9589 100644 --- a/crates/data_connector/src/postgres_migrations.rs +++ b/crates/data_connector/src/postgres_migrations.rs @@ -62,14 +62,19 @@ const POSTGRES_V11: Migration = Migration { description: "Create response_stream_chunks table", up: pg_v11_up, }; +const POSTGRES_V12: Migration = Migration { + version: 12, + description: "Add item_json column and link_id sequence for canonical payloads and ordering", + up: pg_v12_up, +}; /// Core history-backend migrations required by the SQL response/conversation /// storage path during normal gateway startup. -pub(crate) static POSTGRES_HISTORY_MIGRATIONS: [Migration; 3] = - [POSTGRES_V1, POSTGRES_V2, POSTGRES_V3]; +pub(crate) static POSTGRES_HISTORY_MIGRATIONS: [Migration; 4] = + [POSTGRES_V1, POSTGRES_V2, POSTGRES_V3, POSTGRES_V12]; /// Postgres migration list. Append new migrations here. -pub(crate) static POSTGRES_MIGRATIONS: [Migration; 11] = [ +pub(crate) static POSTGRES_MIGRATIONS: [Migration; 12] = [ POSTGRES_V1, POSTGRES_V2, POSTGRES_V3, @@ -81,6 +86,7 @@ pub(crate) static POSTGRES_MIGRATIONS: [Migration; 11] = [ POSTGRES_V9, POSTGRES_V10, POSTGRES_V11, + POSTGRES_V12, ]; fn pg_v1_up(schema: &SchemaConfig) -> Vec { @@ -404,6 +410,52 @@ fn pg_v11_up(schema: &SchemaConfig) -> Vec { ] } +/// Add `item_json` column to `conversation_items` and `link_id` column + +/// monotonic sequence to `conversation_item_links`. +/// +/// `item_json` (JSONB) holds the whole canonical OpenAI item payload so reads +/// can return the correct shape without per-type reconstruction. `link_id` +/// (BIGINT, populated via `conv_item_link_id_seq`) is a strictly increasing +/// per-link key that gives `/v1/conversations/{id}/items` deterministic +/// ordering (replacing `added_at, item_id` which is not unique). +/// +/// Both columns are added as nullable so the migration is non-blocking on +/// large tables. +fn pg_v12_up(schema: &SchemaConfig) -> Vec { + let si = &schema.conversation_items; + let sl = &schema.conversation_item_links; + let si_table = si.qualified_table(schema.owner.as_deref()); + let sl_table = sl.qualified_table(schema.owner.as_deref()); + + let mut stmts: Vec = Vec::new(); + + if !si.is_skipped("item_json") { + let item_json_col = si.col("item_json"); + stmts.push(format!( + "ALTER TABLE {si_table} ADD COLUMN IF NOT EXISTS {item_json_col} JSONB" + )); + } + + if !sl.is_skipped("link_id") { + let link_id_col = sl.col("link_id"); + let seq = pg_qualified_table(schema, "conv_item_link_id_seq"); + stmts.push(format!( + "CREATE SEQUENCE IF NOT EXISTS {seq} INCREMENT BY 1 START WITH 1" + )); + stmts.push(format!( + "ALTER TABLE {sl_table} ADD COLUMN IF NOT EXISTS {link_id_col} BIGINT" + )); + // The UNIQUE INDEX on (conversation_id, link_id) is intentionally NOT + // created here. Creating it while link_id is universally NULL caused + // every link_item INSERT past the first per-conversation to fail in + // e2e (only the first input message survived). It will be added in + // PR 3 immediately after backfilling link_id with sequence values + // and promoting the column to NOT NULL. + } + + stmts +} + // ── Tests ────────────────────────────────────────────────────────────────── #[cfg(test)] @@ -417,7 +469,7 @@ mod tests { .iter() .map(|migration| migration.version) .collect(); - assert_eq!(versions, vec![1, 2, 3]); + assert_eq!(versions, vec![1, 2, 3, 12]); } #[test] @@ -763,4 +815,45 @@ mod tests { assert!(stmts[0].contains("ON DELETE CASCADE")); assert!(stmts[1].contains("stream_chunks_cleanup_idx")); } + + // ── v12: add item_json column and link_id sequence ───────────────────── + + #[test] + fn pg_v12_up_adds_item_json_column_and_link_id_sequence() { + let schema = SchemaConfig { + owner: Some("owner".to_string()), + ..Default::default() + }; + let stmts = pg_v12_up(&schema); + assert_eq!(stmts.len(), 3, "got: {stmts:?}"); + assert!( + stmts[0].contains("ADD COLUMN IF NOT EXISTS item_json JSONB"), + "got: {}", + stmts[0] + ); + assert!( + stmts[1].contains("CREATE SEQUENCE IF NOT EXISTS owner.conv_item_link_id_seq"), + "got: {}", + stmts[1] + ); + assert!( + stmts[2].contains("ADD COLUMN IF NOT EXISTS link_id BIGINT"), + "got: {}", + stmts[2] + ); + } + + #[test] + fn pg_v12_up_respects_skip_columns() { + let mut schema = SchemaConfig::default(); + schema + .conversation_items + .skip_columns + .insert("item_json".to_string()); + schema + .conversation_item_links + .skip_columns + .insert("link_id".to_string()); + assert!(pg_v12_up(&schema).is_empty()); + } } diff --git a/crates/data_connector/src/redis.rs b/crates/data_connector/src/redis.rs index 208c32286..757ff085a 100644 --- a/crates/data_connector/src/redis.rs +++ b/crates/data_connector/src/redis.rs @@ -384,6 +384,7 @@ impl RedisConversationItemStorage { content, status, created_at, + item_json: None, }) } } @@ -401,6 +402,7 @@ impl ConversationItemStorage for RedisConversationItemStorage { role, content, status, + item_json: _, } = item; let id = opt_id.unwrap_or_else(|| make_item_id(&item_type)); let created_at = Utc::now(); @@ -414,6 +416,7 @@ impl ConversationItemStorage for RedisConversationItemStorage { content, status, created_at, + item_json: None, }; let si = &self.store.schema.conversation_items; diff --git a/crates/data_connector/src/schema.rs b/crates/data_connector/src/schema.rs index 77102b576..9926f2fbb 100644 --- a/crates/data_connector/src/schema.rs +++ b/crates/data_connector/src/schema.rs @@ -353,8 +353,9 @@ fn core_columns_for(label: &str) -> &'static [&'static str] { "content", "status", "created_at", + "item_json", ], - "conversation_item_links" => &["conversation_id", "item_id", "added_at"], + "conversation_item_links" => &["conversation_id", "item_id", "added_at", "link_id"], "conversation_memories" => &[ "memory_id", "conversation_id", @@ -1029,4 +1030,10 @@ mod tests { assert!(err.contains("conversation_memories.extra_columns")); assert!(err.contains("shadows a core column name")); } + + #[test] + fn core_columns_registers_item_json_and_link_id() { + assert!(core_columns_for("conversation_items").contains(&"item_json")); + assert!(core_columns_for("conversation_item_links").contains(&"link_id")); + } } diff --git a/model_gateway/src/routers/common/persistence_utils.rs b/model_gateway/src/routers/common/persistence_utils.rs index 40e4904cc..9c56548b0 100644 --- a/model_gateway/src/routers/common/persistence_utils.rs +++ b/model_gateway/src/routers/common/persistence_utils.rs @@ -349,6 +349,7 @@ fn item_to_new_conversation_item( .get("status") .and_then(|v| v.as_str()) .map(String::from), + item_json: None, } } diff --git a/model_gateway/src/routers/conversations/handlers.rs b/model_gateway/src/routers/conversations/handlers.rs index 363a6efec..691297731 100644 --- a/model_gateway/src/routers/conversations/handlers.rs +++ b/model_gateway/src/routers/conversations/handlers.rs @@ -633,6 +633,7 @@ fn parse_item_from_value( role, content, status, + item_json: None, }, warning, )) diff --git a/scripts/oracle_flyway/schema-config.yaml b/scripts/oracle_flyway/schema-config.yaml index e6396a4e8..e9b8050e4 100644 --- a/scripts/oracle_flyway/schema-config.yaml +++ b/scripts/oracle_flyway/schema-config.yaml @@ -4,7 +4,7 @@ # forward migrations during e2e startup. Pin this to the latest core history # migration version that the Flyway-managed schema actually provides. Optional # skills/background tables are owned by their feature-specific schema lifecycle. -version: 3 +version: 12 auto_migrate: false conversations: diff --git a/scripts/oracle_flyway/sql/V4__Add_item_json_and_link_id.sql b/scripts/oracle_flyway/sql/V4__Add_item_json_and_link_id.sql new file mode 100644 index 000000000..317b20efe --- /dev/null +++ b/scripts/oracle_flyway/sql/V4__Add_item_json_and_link_id.sql @@ -0,0 +1,25 @@ +-- V4: Add ITEM_JSON column to CONVERSATION_ITEMS and LINK_ID column + +-- monotonic sequence to CONVERSATION_ITEM_LINKS. +-- +-- ITEM_JSON (CLOB with IS JSON check) holds the whole canonical OpenAI +-- item payload so reads can return the correct shape without per-type +-- reconstruction. LINK_ID (NUMBER, populated via CONV_ITEM_LINK_ID_SEQ) +-- is a strictly increasing per-link key that gives /v1/conversations/{id}/items +-- deterministic ordering (replacing ADDED_AT, ITEM_ID which is not unique). +-- +-- Mirrors crates/data_connector/src/oracle_migrations.rs::oracle_v12_up. + +------------------------------------------------------------ +-- 1. CONVERSATION_ITEMS: add ITEM_JSON +------------------------------------------------------------ +ALTER TABLE CONVERSATION_ITEMS ADD (ITEM_JSON CLOB CHECK (ITEM_JSON IS JSON)); + +------------------------------------------------------------ +-- 2. CONVERSATION_ITEM_LINKS: add LINK_ID + sequence +------------------------------------------------------------ +-- ORDER guarantees strictly monotonic NEXTVAL across RAC / multi-instance +-- deployments and is a no-op on single-instance Oracle. Required because +-- LINK_ID backs deterministic conversation-item ordering. +CREATE SEQUENCE CONV_ITEM_LINK_ID_SEQ START WITH 1 INCREMENT BY 1 NOCACHE ORDER; + +ALTER TABLE CONVERSATION_ITEM_LINKS ADD (LINK_ID NUMBER);