diff --git a/Cargo.lock b/Cargo.lock index 16acc06b38704..cdece97702ad6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12600,8 +12600,8 @@ dependencies = [ "fail", "futures", "glob", + "indexmap 2.12.0", "itertools 0.14.0", - "lru", "madsim", "madsim-rdkafka", "madsim-tokio", @@ -12814,7 +12814,6 @@ dependencies = [ "iceberg", "itertools 0.14.0", "jsonbb", - "lru", "madsim-tokio", "madsim-tonic", "maplit", diff --git a/Cargo.toml b/Cargo.toml index c2a81b84d5d45..8e94dde6973ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -175,7 +175,7 @@ iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d ] } iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" } iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d1b8cabf4f8a33090d018b539bfec6e7c623d7c4" } - +indexmap = { version = "2.12.0", features = ["serde"] } itertools = "0.14.0" jni = { version = "0.21.1", features = ["invocation"] } jsonbb = "0.1.4" diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index a38d6a600eb57..28b9bc83e8d10 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -56,7 +56,6 @@ jiff = "0.1.15" jni = { workspace = true } jsonbb = { workspace = true } linkme = { workspace = true } -lru = { workspace = true } memcomparable = { version = "0.2", features = ["decimal"] } num-integer = "0.1" num-traits = "0.2" @@ -144,6 +143,7 @@ mach2 = "0.4" coarsetime = "0.1" criterion = { workspace = true } expect-test = "1" +lru = { workspace = true } more-asserts = "0.3" pretty_assertions = "1" rand = { workspace = true } diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 9843fd1800c61..73fdf687ef3e7 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -70,7 +70,7 @@ google-cloud-pubsub = { package = "gcloud-pubsub", version = "1" } iceberg = { workspace = true } iceberg-catalog-glue = { workspace = true } iceberg-catalog-rest = { workspace = true } -indexmap = { version = "2.12.0", features = ["serde"] } +indexmap = { workspace = true } itertools = { workspace = true } jni = { workspace = true } maplit = "1.0.2" diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 051279f130286..4c329953a7fe2 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -33,7 +33,7 @@ futures-async-stream = { workspace = true } hex = "0.4" http = "1" iceberg = { workspace = true } -indexmap = { version = "2.12.0", features = ["serde"] } +indexmap = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } maplit = "1.0.2" diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 427ca43dc9a10..bf67f1c397c29 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -34,7 +34,6 @@ hytra = "0.1.2" iceberg = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } -lru = { workspace = true } maplit = "1.0.2" memcomparable = "0.2" multimap = "0.10" diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index d13b536d7a240..ad8d588d0f05e 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -19,7 +19,6 @@ use either::Either; use futures::TryStreamExt; use futures::stream::{self, PollNext}; use itertools::Itertools; -use lru::DefaultHasher; use risingwave_common::array::Op; use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::hash::{HashKey, NullBitmap}; @@ -107,7 +106,7 @@ struct TemporalSide { source: BatchTable, table_stream_key_indices: Vec, table_output_indices: Vec, - cache: ManagedLruCache, + cache: ManagedLruCache, join_key_data_types: Vec, } @@ -618,11 +617,7 @@ impl, -} - -impl Default for SetStmts { - fn default() -> Self { - Self { - stmts_cache: LruCache::unbounded(), - } - } -} - -struct SetStmtsIterator<'a, 'b> -where - 'a: 'b, -{ - _stmts: &'a SetStmts, - stmts_iter: core::iter::Rev>, -} - -impl<'a> SetStmtsIterator<'a, '_> { - fn new(stmts: &'a SetStmts) -> Self { - Self { - _stmts: stmts, - stmts_iter: stmts.stmts_cache.iter().rev(), - } - } + // variable name -> last set statement + stmts: IndexMap, } impl SetStmts { @@ -78,19 +55,14 @@ impl SetStmts { } => { let key = variable.real_value().to_lowercase(); // store complete sql as value. - self.stmts_cache.put(key, sql.to_owned()); + self.stmts.insert(key, sql.to_owned()); } _ => unreachable!(), } } -} - -impl Iterator for SetStmtsIterator<'_, '_> { - type Item = String; - fn next(&mut self) -> Option { - let (_, stmt) = self.stmts_iter.next()?; - Some(stmt.clone()) + fn replay_iter(&self) -> impl Iterator + '_ { + self.stmts.values().map(|s| s.as_str()) } } @@ -130,8 +102,8 @@ impl RisingWave { } }); // replay all SET statements - for stmt in SetStmtsIterator::new(set_stmts) { - client.simple_query(&stmt).await?; + for stmt in set_stmts.replay_iter() { + client.simple_query(stmt).await?; } Ok((client, task)) }