Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/data_connector/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ pub struct ConversationItem {
pub content: Value,
pub status: Option<String>,
pub created_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub item_json: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -199,6 +201,8 @@ pub struct NewConversationItem {
pub role: Option<String>,
pub content: Value,
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub item_json: Option<Value>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
2 changes: 2 additions & 0 deletions crates/data_connector/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -451,6 +452,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/hooked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -1401,6 +1402,7 @@ mod tests {
role: Some("user".to_string()),
content: json!("hello"),
status: None,
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -1474,6 +1476,7 @@ mod tests {
role: Some("user".to_string()),
content: json!("test"),
status: None,
item_json: None,
})
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions crates/data_connector/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl ConversationItemStorage for MemoryConversationItemStorage {
content: new_item.content,
status: new_item.status,
created_at,
item_json: None,
};
self.inner.write().items.insert(id, item.clone());
Ok(item)
Expand Down Expand Up @@ -564,6 +565,7 @@ mod tests {
role: role.map(|r| r.to_string()),
content,
status: Some("completed".to_string()),
item_json: None,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ConversationItemStorage for NoOpConversationItemStorage {
content: item.content,
status: item.status,
created_at: Utc::now(),
item_json: None,
})
}

Expand Down Expand Up @@ -301,6 +302,7 @@ mod tests {
role: role.map(|r| r.to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
}
}

Expand Down Expand Up @@ -329,6 +331,7 @@ mod tests {
role: Some("assistant".to_string()),
content: json!(["hello"]),
status: Some("completed".to_string()),
item_json: None,
};
let item = store
.create_item(input)
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ impl ConversationItemStorage for OracleConversationItemStorage {
role,
content,
status,
item_json: _,
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand Down Expand Up @@ -763,6 +764,7 @@ impl ConversationItemStorage for OracleConversationItemStorage {
content,
status,
created_at,
item_json: None,
})
}

Expand Down Expand Up @@ -1147,6 +1149,7 @@ fn build_item_from_oracle_row(
content,
status,
created_at,
item_json: None,
})
}

Expand Down
109 changes: 105 additions & 4 deletions crates/data_connector/src/oracle_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,21 @@ const ORACLE_V11: Migration = Migration {
description: "Create response_stream_chunks table",
up: oracle_v11_up,
};
const ORACLE_V12: Migration = Migration {
version: 12,
description: "Add item_json column and link_id sequence for canonical payloads and ordering",
up: oracle_v12_up,
};

/// Core history-backend migrations required by the SQL response/conversation
/// storage path during normal gateway startup.
pub(crate) static ORACLE_HISTORY_MIGRATIONS: [Migration; 3] = [ORACLE_V1, ORACLE_V2, ORACLE_V3];
pub(crate) static ORACLE_HISTORY_MIGRATIONS: [Migration; 4] =
[ORACLE_V1, ORACLE_V2, ORACLE_V3, ORACLE_V12];

/// Oracle migration list. Append new migrations here.
pub(crate) static ORACLE_MIGRATIONS: [Migration; 11] = [
pub(crate) static ORACLE_MIGRATIONS: [Migration; 12] = [
ORACLE_V1, ORACLE_V2, ORACLE_V3, ORACLE_V4, ORACLE_V5, ORACLE_V6, ORACLE_V7, ORACLE_V8,
ORACLE_V9, ORACLE_V10, ORACLE_V11,
ORACLE_V9, ORACLE_V10, ORACLE_V11, ORACLE_V12,
];
Comment thread
khoaatra marked this conversation as resolved.

fn oracle_v1_up(schema: &SchemaConfig) -> Vec<String> {
Expand Down Expand Up @@ -449,6 +455,65 @@ fn oracle_v11_up(schema: &SchemaConfig) -> Vec<String> {
]
}

/// Add `item_json` column to `conversation_items` and `link_id` column +
/// monotonic sequence to `conversation_item_links`.
///
/// `item_json` (CLOB with `IS JSON` check) holds the whole canonical OpenAI
/// item payload so reads can return the correct shape without per-type
/// reconstruction. `link_id` (NUMBER, populated via `CONV_ITEM_LINK_ID_SEQ`)
/// is a strictly increasing per-link key that gives `/v1/conversations/{id}/items`
/// deterministic ordering (replacing `added_at, item_id` which is not unique).
///
/// Both columns are added as nullable so the migration is non-blocking on
/// large tables.
fn oracle_v12_up(schema: &SchemaConfig) -> Vec<String> {
let si = &schema.conversation_items;
let sl = &schema.conversation_item_links;
let si_table = si.qualified_table(schema.owner.as_deref());
let sl_table = sl.qualified_table(schema.owner.as_deref());

let mut stmts: Vec<String> = Vec::new();

if !si.is_skipped("item_json") {
let item_json_col = si.col("item_json").to_uppercase();
// CLOB + `IS JSON` check ensures only valid JSON is accepted
// (Oracle < 21c has no native JSON type).
// ORA-01430 = "column being added already exists in table".
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {si_table} ADD ({item_json_col} CLOB CHECK ({item_json_col} IS JSON))'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;"
));
}

if !sl.is_skipped("link_id") {
let link_id_col = sl.col("link_id").to_uppercase();
let seq = oracle_qualified_name(schema, "CONV_ITEM_LINK_ID_SEQ");

// ORA-00955 = "name is already used by an existing object".
// ORDER (vs NOORDER) makes the sequence strictly monotonic in request
// order across RAC / multi-instance deployments at the cost of a
// cross-instance lock; on single-instance Oracle (e.g. ATP serverless)
// it is a no-op. Required for the deterministic ordering goal that
// LINK_ID backs.
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'CREATE SEQUENCE {seq} START WITH 1 INCREMENT BY 1 NOCACHE ORDER'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;"
));
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {sl_table} ADD ({link_id_col} NUMBER)'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;"
));
// The UNIQUE INDEX on (conversation_id, link_id) is intentionally NOT
// created here. Creating it while LINK_ID is universally NULL caused
// every link_item INSERT past the first per-conversation to fail in
// e2e (only the first input message survived). It will be added in
// PR 3 immediately after backfilling LINK_ID with sequence values
// and promoting the column to NOT NULL.
}

stmts
}

// ── Tests ──────────────────────────────────────────────────────────────────

#[cfg(test)]
Expand All @@ -462,7 +527,7 @@ mod tests {
.iter()
.map(|migration| migration.version)
.collect();
assert_eq!(versions, vec![1, 2, 3]);
assert_eq!(versions, vec![1, 2, 3, 12]);
}

#[test]
Expand Down Expand Up @@ -680,6 +745,42 @@ mod tests {
);
}

// ── v12: add item_json column and link_id sequence ─────────────────────

#[test]
fn oracle_v12_up_adds_item_json_column_and_link_id_sequence() {
let schema = SchemaConfig {
owner: Some("OWNER".to_string()),
..Default::default()
};
let stmts = oracle_v12_up(&schema);
assert_eq!(stmts.len(), 3, "got: {stmts:?}");
assert!(stmts[0].contains("ADD (ITEM_JSON CLOB") && stmts[0].contains("IS JSON"));
assert!(stmts[1].contains("CREATE SEQUENCE OWNER.CONV_ITEM_LINK_ID_SEQ"));
// Leading space distinguishes ORDER from NOORDER. Required for
// strictly monotonic NEXTVAL on RAC / multi-instance deployments.
assert!(
stmts[1].contains(" ORDER"),
"sequence must use ORDER for RAC determinism: {}",
stmts[1]
);
assert!(stmts[2].contains("ADD (LINK_ID NUMBER)"));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

#[test]
fn oracle_v12_up_respects_skip_columns() {
let mut schema = SchemaConfig::default();
schema
.conversation_items
.skip_columns
.insert("item_json".to_string());
schema
.conversation_item_links
.skip_columns
.insert("link_id".to_string());
assert!(oracle_v12_up(&schema).is_empty());
}

#[test]
fn oracle_v2_up_generates_plsql_drop_column() {
let schema = SchemaConfig::default();
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage {
role,
content,
status,
item_json: _,
Comment thread
khoaatra marked this conversation as resolved.
Comment thread
khoaatra marked this conversation as resolved.
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand Down Expand Up @@ -506,6 +507,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage {
content,
status,
created_at,
item_json: None,
})
}

Expand Down Expand Up @@ -817,6 +819,7 @@ fn build_item_from_row(
content,
status,
created_at,
item_json: None,
})
}

Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ impl RedisConversationItemStorage {
content,
status,
created_at,
item_json: None,
})
}
}
Expand All @@ -401,6 +402,7 @@ impl ConversationItemStorage for RedisConversationItemStorage {
role,
content,
status,
item_json: _,
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand All @@ -414,6 +416,7 @@ impl ConversationItemStorage for RedisConversationItemStorage {
content,
status,
created_at,
item_json: None,
};

let si = &self.store.schema.conversation_items;
Expand Down
9 changes: 8 additions & 1 deletion crates/data_connector/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ fn core_columns_for(label: &str) -> &'static [&'static str] {
"content",
"status",
"created_at",
"item_json",
],
"conversation_item_links" => &["conversation_id", "item_id", "added_at"],
"conversation_item_links" => &["conversation_id", "item_id", "added_at", "link_id"],
Comment thread
khoaatra marked this conversation as resolved.
"conversation_memories" => &[
"memory_id",
"conversation_id",
Expand Down Expand Up @@ -1029,4 +1030,10 @@ mod tests {
assert!(err.contains("conversation_memories.extra_columns"));
assert!(err.contains("shadows a core column name"));
}

#[test]
fn core_columns_registers_item_json_and_link_id() {
assert!(core_columns_for("conversation_items").contains(&"item_json"));
assert!(core_columns_for("conversation_item_links").contains(&"link_id"));
}
}
1 change: 1 addition & 0 deletions model_gateway/src/routers/common/persistence_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ fn item_to_new_conversation_item(
.get("status")
.and_then(|v| v.as_str())
.map(String::from),
item_json: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions model_gateway/src/routers/conversations/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ fn parse_item_from_value(
role,
content,
status,
item_json: None,
},
warning,
))
Expand Down
2 changes: 1 addition & 1 deletion scripts/oracle_flyway/schema-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# forward migrations during e2e startup. Pin this to the latest core history
# migration version that the Flyway-managed schema actually provides. Optional
# skills/background tables are owned by their feature-specific schema lifecycle.
version: 3
version: 12
auto_migrate: false

conversations:
Expand Down
25 changes: 25 additions & 0 deletions scripts/oracle_flyway/sql/V4__Add_item_json_and_link_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- V4: Add ITEM_JSON column to CONVERSATION_ITEMS and LINK_ID column +
-- monotonic sequence to CONVERSATION_ITEM_LINKS.
--
-- ITEM_JSON (CLOB with IS JSON check) holds the whole canonical OpenAI
-- item payload so reads can return the correct shape without per-type
-- reconstruction. LINK_ID (NUMBER, populated via CONV_ITEM_LINK_ID_SEQ)
-- is a strictly increasing per-link key that gives /v1/conversations/{id}/items
-- deterministic ordering (replacing ADDED_AT, ITEM_ID which is not unique).
--
-- Mirrors crates/data_connector/src/oracle_migrations.rs::oracle_v12_up.

------------------------------------------------------------
-- 1. CONVERSATION_ITEMS: add ITEM_JSON
------------------------------------------------------------
ALTER TABLE CONVERSATION_ITEMS ADD (ITEM_JSON CLOB CHECK (ITEM_JSON IS JSON));

------------------------------------------------------------
-- 2. CONVERSATION_ITEM_LINKS: add LINK_ID + sequence
------------------------------------------------------------
-- ORDER guarantees strictly monotonic NEXTVAL across RAC / multi-instance
-- deployments; no-op on single-instance Oracle. Required because LINK_ID
-- backs deterministic conversation-item ordering.
CREATE SEQUENCE CONV_ITEM_LINK_ID_SEQ START WITH 1 INCREMENT BY 1 NOCACHE ORDER;

ALTER TABLE CONVERSATION_ITEM_LINKS ADD (LINK_ID NUMBER);
Comment thread
khoaatra marked this conversation as resolved.