Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/data_connector/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ pub struct ConversationItem {
pub content: Value,
pub status: Option<String>,
pub created_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub item_json: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -199,6 +201,8 @@ pub struct NewConversationItem {
pub role: Option<String>,
pub content: Value,
pub status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub item_json: Option<Value>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
2 changes: 2 additions & 0 deletions crates/data_connector/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -451,6 +452,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/hooked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ mod tests {
role: Some("user".to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -1401,6 +1402,7 @@ mod tests {
role: Some("user".to_string()),
content: json!("hello"),
status: None,
item_json: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -1474,6 +1476,7 @@ mod tests {
role: Some("user".to_string()),
content: json!("test"),
status: None,
item_json: None,
})
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions crates/data_connector/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl ConversationItemStorage for MemoryConversationItemStorage {
content: new_item.content,
status: new_item.status,
created_at,
item_json: None,
};
self.inner.write().items.insert(id, item.clone());
Ok(item)
Expand Down Expand Up @@ -564,6 +565,7 @@ mod tests {
role: role.map(|r| r.to_string()),
content,
status: Some("completed".to_string()),
item_json: None,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ConversationItemStorage for NoOpConversationItemStorage {
content: item.content,
status: item.status,
created_at: Utc::now(),
item_json: None,
})
}

Expand Down Expand Up @@ -301,6 +302,7 @@ mod tests {
role: role.map(|r| r.to_string()),
content: json!([]),
status: Some("completed".to_string()),
item_json: None,
}
}

Expand Down Expand Up @@ -329,6 +331,7 @@ mod tests {
role: Some("assistant".to_string()),
content: json!(["hello"]),
status: Some("completed".to_string()),
item_json: None,
};
let item = store
.create_item(input)
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ impl ConversationItemStorage for OracleConversationItemStorage {
role,
content,
status,
item_json: _,
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand Down Expand Up @@ -763,6 +764,7 @@ impl ConversationItemStorage for OracleConversationItemStorage {
content,
status,
created_at,
item_json: None,
})
}

Expand Down Expand Up @@ -1147,6 +1149,7 @@ fn build_item_from_oracle_row(
content,
status,
created_at,
item_json: None,
})
}

Expand Down
109 changes: 105 additions & 4 deletions crates/data_connector/src/oracle_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,21 @@ const ORACLE_V11: Migration = Migration {
description: "Create response_stream_chunks table",
up: oracle_v11_up,
};
const ORACLE_V12: Migration = Migration {
version: 12,
description: "Add item_json column and link_id sequence for canonical payloads and ordering",
up: oracle_v12_up,
};

/// Core history-backend migrations required by the SQL response/conversation
/// storage path during normal gateway startup.
pub(crate) static ORACLE_HISTORY_MIGRATIONS: [Migration; 3] = [ORACLE_V1, ORACLE_V2, ORACLE_V3];
pub(crate) static ORACLE_HISTORY_MIGRATIONS: [Migration; 4] =
[ORACLE_V1, ORACLE_V2, ORACLE_V3, ORACLE_V12];

/// Oracle migration list. Append new migrations here.
pub(crate) static ORACLE_MIGRATIONS: [Migration; 11] = [
pub(crate) static ORACLE_MIGRATIONS: [Migration; 12] = [
ORACLE_V1, ORACLE_V2, ORACLE_V3, ORACLE_V4, ORACLE_V5, ORACLE_V6, ORACLE_V7, ORACLE_V8,
ORACLE_V9, ORACLE_V10, ORACLE_V11,
ORACLE_V9, ORACLE_V10, ORACLE_V11, ORACLE_V12,
];
Comment thread
khoaatra marked this conversation as resolved.

fn oracle_v1_up(schema: &SchemaConfig) -> Vec<String> {
Expand Down Expand Up @@ -449,6 +455,65 @@ fn oracle_v11_up(schema: &SchemaConfig) -> Vec<String> {
]
}

/// Add `item_json` column to `conversation_items` and `link_id` column +
/// monotonic sequence to `conversation_item_links`.
///
/// `item_json` (CLOB with `IS JSON` check) holds the whole canonical OpenAI
/// item payload so reads can return the correct shape without per-type
/// reconstruction. `link_id` (NUMBER, populated via `CONV_ITEM_LINK_ID_SEQ`)
/// is a strictly increasing per-link key that gives `/v1/conversations/{id}/items`
/// deterministic ordering (replacing `added_at, item_id` which is not unique).
///
/// Both columns are added as nullable so the migration is non-blocking on
/// large tables.
fn oracle_v12_up(schema: &SchemaConfig) -> Vec<String> {
let si = &schema.conversation_items;
let sl = &schema.conversation_item_links;
let si_table = si.qualified_table(schema.owner.as_deref());
let sl_table = sl.qualified_table(schema.owner.as_deref());

let mut stmts: Vec<String> = Vec::new();

if !si.is_skipped("item_json") {
let item_json_col = si.col("item_json").to_uppercase();
// CLOB + `IS JSON` check ensures only valid JSON is accepted
// (Oracle < 21c has no native JSON type).
// ORA-01430 = "column being added already exists in table".
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {si_table} ADD ({item_json_col} CLOB CHECK ({item_json_col} IS JSON))'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;"
));
}

if !sl.is_skipped("link_id") {
let link_id_col = sl.col("link_id").to_uppercase();
let seq = oracle_qualified_name(schema, "CONV_ITEM_LINK_ID_SEQ");

// ORA-00955 = "name is already used by an existing object".
// ORDER (vs NOORDER) makes the sequence strictly monotonic in request
// order across RAC / multi-instance deployments at the cost of a
// cross-instance lock; on single-instance Oracle (e.g. ATP serverless)
// it is a no-op. Required for the deterministic ordering goal that
// LINK_ID backs.
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'CREATE SEQUENCE {seq} START WITH 1 INCREMENT BY 1 NOCACHE ORDER'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;"
));
stmts.push(format!(
"BEGIN EXECUTE IMMEDIATE 'ALTER TABLE {sl_table} ADD ({link_id_col} NUMBER)'; \
EXCEPTION WHEN OTHERS THEN IF SQLCODE != -1430 THEN RAISE; END IF; END;"
));
// The UNIQUE INDEX on (conversation_id, link_id) is intentionally NOT
// created here. Creating it while LINK_ID is universally NULL caused
// every link_item INSERT past the first per-conversation to fail in
// e2e (only the first input message survived). It will be added in
// PR 3 immediately after backfilling LINK_ID with sequence values
// and promoting the column to NOT NULL.
}

stmts
}

// ── Tests ──────────────────────────────────────────────────────────────────

#[cfg(test)]
Expand All @@ -462,7 +527,7 @@ mod tests {
.iter()
.map(|migration| migration.version)
.collect();
assert_eq!(versions, vec![1, 2, 3]);
assert_eq!(versions, vec![1, 2, 3, 12]);
}

#[test]
Expand Down Expand Up @@ -680,6 +745,42 @@ mod tests {
);
}

// ── v12: add item_json column and link_id sequence ─────────────────────

#[test]
fn oracle_v12_up_adds_item_json_column_and_link_id_sequence() {
let schema = SchemaConfig {
owner: Some("OWNER".to_string()),
..Default::default()
};
let stmts = oracle_v12_up(&schema);
assert_eq!(stmts.len(), 3, "got: {stmts:?}");
assert!(stmts[0].contains("ADD (ITEM_JSON CLOB") && stmts[0].contains("IS JSON"));
assert!(stmts[1].contains("CREATE SEQUENCE OWNER.CONV_ITEM_LINK_ID_SEQ"));
// Leading space distinguishes ORDER from NOORDER. Required for
// strictly monotonic NEXTVAL on RAC / multi-instance deployments.
assert!(
stmts[1].contains(" ORDER"),
"sequence must use ORDER for RAC determinism: {}",
stmts[1]
);
assert!(stmts[2].contains("ADD (LINK_ID NUMBER)"));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

#[test]
fn oracle_v12_up_respects_skip_columns() {
let mut schema = SchemaConfig::default();
schema
.conversation_items
.skip_columns
.insert("item_json".to_string());
schema
.conversation_item_links
.skip_columns
.insert("link_id".to_string());
assert!(oracle_v12_up(&schema).is_empty());
}

#[test]
fn oracle_v2_up_generates_plsql_drop_column() {
let schema = SchemaConfig::default();
Expand Down
3 changes: 3 additions & 0 deletions crates/data_connector/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage {
role,
content,
status,
item_json: _,
Comment thread
khoaatra marked this conversation as resolved.
Comment thread
khoaatra marked this conversation as resolved.
} = item;
let id = opt_id.unwrap_or_else(|| make_item_id(&item_type));
let created_at = Utc::now();
Expand Down Expand Up @@ -506,6 +507,7 @@ impl ConversationItemStorage for PostgresConversationItemStorage {
content,
status,
created_at,
item_json: None,
})
}

Expand Down Expand Up @@ -817,6 +819,7 @@ fn build_item_from_row(
content,
status,
created_at,
item_json: None,
})
}

Expand Down
Loading
Loading