Skip to content
Open
4 changes: 4 additions & 0 deletions crates/data_connector/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ pub struct ConversationItem {
pub content: Value,
pub status: Option<String>,
pub created_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub item_json: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -199,6 +201,8 @@ pub struct NewConversationItem {
pub role: Option<String>,
pub content: Value,
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub item_json: Option<Value>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
2 changes: 2 additions & 0 deletions crates/data_connector/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -451,6 +452,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/hooked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -1401,6 +1402,7 @@ mod tests {
role: Some("user".to_string()),
content: json!("hello"),
status: None,
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -1474,6 +1476,7 @@ mod tests {
role: Some("user".to_string()),
content: json!("test"),
status: None,
item_json: None,
})
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions crates/data_connector/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl ConversationItemStorage for MemoryConversationItemStorage {
content: new_item.content,
status: new_item.status,
created_at,
item_json: new_item.item_json,
Comment thread
khoaatra marked this conversation as resolved.
Outdated
};
self.inner.write().items.insert(id, item.clone());
Ok(item)
Expand Down Expand Up @@ -564,6 +565,7 @@ mod tests {
role: role.map(|r| r.to_string()),
content,
status: Some("completed".to_string()),
item_json: None,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ConversationItemStorage for NoOpConversationItemStorage {
content: item.content,
status: item.status,
created_at: Utc::now(),
item_json: item.item_json,
})
}

Expand Down Expand Up @@ -301,6 +302,7 @@ mod tests {
role: role.map(|r| r.to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
}
}

Expand Down Expand Up @@ -329,6 +331,7 @@ mod tests {
role: Some("assistant".to_string()),
content: json!(["hello"]),
status: Some("completed".to_string()),
item_json: None,
};
let item = store
.create_item(input)
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ impl ConversationItemStorage for OracleConversationItemStorage {
role,
content,
status,
item_json,
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand Down Expand Up @@ -763,6 +764,7 @@ impl ConversationItemStorage for OracleConversationItemStorage {
content,
status,
created_at,
item_json,
})
}

Expand Down Expand Up @@ -1147,6 +1149,7 @@ fn build_item_from_oracle_row(
content,
status,
created_at,
item_json: None,
})
}

Expand Down
96 changes: 94 additions & 2 deletions crates/data_connector/src/oracle_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,20 @@ const ORACLE_V11: Migration = Migration {
description: "Create response_stream_chunks table",
up: oracle_v11_up,
};
const ORACLE_V12: Migration = Migration {
version: 12,
description: "Add item_json column and link_id sequence for canonical payloads and ordering",
up: oracle_v12_up,
};

/// Core history-backend migrations required by the SQL response/conversation
/// storage path during normal gateway startup.
pub(crate) static ORACLE_HISTORY_MIGRATIONS: [Migration; 3] = [ORACLE_V1, ORACLE_V2, ORACLE_V3];

/// Oracle migration list. Append new migrations here.
pub(crate) static ORACLE_MIGRATIONS: [Migration; 11] = [
pub(crate) static ORACLE_MIGRATIONS: [Migration; 12] = [
ORACLE_V1, ORACLE_V2, ORACLE_V3, ORACLE_V4, ORACLE_V5, ORACLE_V6, ORACLE_V7, ORACLE_V8,
ORACLE_V9, ORACLE_V10, ORACLE_V11,
ORACLE_V9, ORACLE_V10, ORACLE_V11, ORACLE_V12,
];
Comment thread
khoaatra marked this conversation as resolved.

fn oracle_v1_up(schema: &SchemaConfig) -> Vec<String> {
Expand Down Expand Up @@ -449,6 +454,62 @@ fn oracle_v11_up(schema: &SchemaConfig) -> Vec<String> {
]
}

/// Add `item_json` column to `conversation_items` and `link_id` column +
/// monotonic sequence to `conversation_item_links`.
///
/// `item_json` (CLOB with `IS JSON` check) holds the whole canonical OpenAI
/// item payload so reads can return the correct shape without per-type
/// reconstruction. `link_id` (NUMBER, populated via `CONV_ITEM_LINK_ID_SEQ`)
/// is a strictly increasing per-link key that gives `/v1/conversations/{id}/items`
/// deterministic ordering (replacing `added_at, item_id` which is not unique).
///
/// Both columns are added as nullable so the migration is non-blocking on
/// large tables.
fn oracle_v12_up(schema: &SchemaConfig) -> Vec<String> {
let si = &schema.conversation_items;
let sl = &schema.conversation_item_links;
let si_table = si.qualified_table(schema.owner.as_deref());
let sl_table = sl.qualified_table(schema.owner.as_deref());

let mut stmts: Vec<String> = Vec::new();

if !si.is_skipped("item_json") {
let item_json_col = si.col("item_json").to_uppercase();
// CLOB + `IS JSON` check ensures only valid JSON is accepted
// (Oracle < 21c has no native JSON type).
// ORA-01430 = "column being added already exists in table".
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {si_table} ADD ({item_json_col} CLOB CHECK ({item_json_col} IS JSON))'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;"
));
}

if !sl.is_skipped("link_id") {
let link_id_col = sl.col("link_id").to_uppercase();
let cid_col = sl.col("conversation_id").to_uppercase();
let seq = oracle_qualified_name(schema, "CONV_ITEM_LINK_ID_SEQ");
// Fixed short index name keeps emitted identifier under Oracle's
// 30-char limit even with a long custom table name.
let idx = oracle_qualified_name(schema, "IDX_CONV_LINK_ID");

// ORA-00955 = "name is already used by an existing object".
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'CREATE SEQUENCE {seq} START WITH 1 INCREMENT BY 1 NOCACHE NOORDER'; \
Comment thread
khoaatra marked this conversation as resolved.
Outdated
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;"
));
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {sl_table} ADD ({link_id_col} NUMBER)'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;"
));
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'CREATE UNIQUE INDEX {idx} ON {sl_table} ({cid_col}, {link_id_col})'; \
Comment thread
khoaatra marked this conversation as resolved.
Outdated
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;"
));
Comment thread
khoaatra marked this conversation as resolved.
Outdated
}

stmts
}

// ── Tests ──────────────────────────────────────────────────────────────────

#[cfg(test)]
Expand Down Expand Up @@ -680,6 +741,37 @@ mod tests {
);
}

// ── v12: add item_json column and link_id sequence ─────────────────────

#[test]
fn oracle_v12_up_adds_item_json_column_and_link_id_sequence() {
let schema = SchemaConfig {
owner: Some("OWNER".to_string()),
..Default::default()
};
let stmts = oracle_v12_up(&schema);
assert_eq!(stmts.len(), 4, "got: {stmts:?}");
assert!(stmts[0].contains("ADD (ITEM_JSON CLOB") && stmts[0].contains("IS JSON"));
assert!(stmts[1].contains("CREATE SEQUENCE OWNER.CONV_ITEM_LINK_ID_SEQ"));
assert!(stmts[2].contains("ADD (LINK_ID NUMBER)"));
assert!(stmts[3].contains("UNIQUE INDEX OWNER.IDX_CONV_LINK_ID"));
assert!(stmts[3].contains("(CONVERSATION_ID, LINK_ID)"));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

#[test]
fn oracle_v12_up_respects_skip_columns() {
let mut schema = SchemaConfig::default();
schema
.conversation_items
.skip_columns
.insert("item_json".to_string());
schema
.conversation_item_links
.skip_columns
.insert("link_id".to_string());
assert!(oracle_v12_up(&schema).is_empty());
}

#[test]
fn oracle_v2_up_generates_plsql_drop_column() {
let schema = SchemaConfig::default();
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage {
role,
content,
status,
item_json,
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand Down Expand Up @@ -506,6 +507,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage {
content,
status,
created_at,
item_json,
Comment thread
khoaatra marked this conversation as resolved.
Outdated
})
}

Expand Down Expand Up @@ -817,6 +819,7 @@ fn build_item_from_row(
content,
status,
created_at,
item_json: None,
})
}

Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ impl RedisConversationItemStorage {
content,
status,
created_at,
item_json: None,
})
}
}
Expand All @@ -401,6 +402,7 @@ impl ConversationItemStorage for RedisConversationItemStorage {
role,
content,
status,
item_json,
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand All @@ -414,6 +416,7 @@ impl ConversationItemStorage for RedisConversationItemStorage {
content,
status,
created_at,
item_json,
};

let si = &self.store.schema.conversation_items;
Expand Down
9 changes: 8 additions & 1 deletion crates/data_connector/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ fn core_columns_for(label: &str) -> &'static [&'static str] {
"content",
"status",
"created_at",
"item_json",
],
"conversation_item_links" => &["conversation_id", "item_id", "added_at"],
"conversation_item_links" => &["conversation_id", "item_id", "added_at", "link_id"],
Comment thread
khoaatra marked this conversation as resolved.
"conversation_memories" => &[
"memory_id",
"conversation_id",
Expand Down Expand Up @@ -1029,4 +1030,10 @@ mod tests {
assert!(err.contains("conversation_memories.extra_columns"));
assert!(err.contains("shadows a core column name"));
}

#[test]
fn core_columns_registers_item_json_and_link_id() {
assert!(core_columns_for("conversation_items").contains(&"item_json"));
assert!(core_columns_for("conversation_item_links").contains(&"link_id"));
}
}
1 change: 1 addition & 0 deletions model_gateway/src/routers/common/persistence_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ fn item_to_new_conversation_item(
.get("status")
.and_then(|v| v.as_str())
.map(String::from),
item_json: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions model_gateway/src/routers/conversations/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ fn parse_item_from_value(
role,
content,
status,
item_json: None,
},
warning,
))
Expand Down