diff --git a/Cargo.lock b/Cargo.lock
index f1a6e60f2880d..7183c990bfce9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1751,7 +1751,7 @@ dependencies = [
  "http 1.1.0",
  "http-body 1.0.0",
  "http-body-util",
- "hyper 1.1.0",
+ "hyper 1.4.1",
  "hyper-util",
  "itoa",
  "matchit",
@@ -2791,22 +2791,22 @@ dependencies = [
 
 [[package]]
 name = "console-api"
-version = "0.7.0"
+version = "0.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd"
+checksum = "86ed14aa9c9f927213c6e4f3ef75faaad3406134efe84ba2cb7983431d5f0931"
 dependencies = [
  "futures-core",
- "prost 0.12.1",
- "prost-types 0.12.1",
- "tonic 0.11.0",
+ "prost 0.13.1",
+ "prost-types 0.13.1",
+ "tonic 0.12.1",
  "tracing-core",
 ]
 
 [[package]]
 name = "console-subscriber"
-version = "0.3.0"
+version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758"
+checksum = "e2e3a111a37f3333946ebf9da370ba5c5577b18eb342ec683eb488dd21980302"
 dependencies = [
  "console-api",
  "crossbeam-channel",
@@ -2814,14 +2814,15 @@ dependencies = [
  "futures-task",
  "hdrhistogram",
  "humantime",
- "prost 0.12.1",
- "prost-types 0.12.1",
+ "hyper-util",
+ "prost 0.13.1",
+ "prost-types 0.13.1",
  "serde",
  "serde_json",
  "thread_local",
  "tokio",
- "tokio-stream",
- "tonic 0.11.0",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tonic 0.12.1",
  "tracing",
  "tracing-core",
  "tracing-subscriber",
@@ -4436,15 +4437,15 @@ dependencies = [
 
 [[package]]
 name = "etcd-client"
-version = "0.12.4"
+version = "0.14.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4ae697f3928e8c89ae6f4dcf788059f49fd01a76dc53e63628f5a33881f5715e"
+checksum = "39bde3ce50a626efeb1caa9ab1083972d178bebb55ca627639c8ded507dfcbde"
 dependencies = [
- "http 0.2.9",
- "prost 0.12.1",
+ "http 1.1.0",
+ "prost 0.13.1",
  "tokio",
- "tokio-stream",
- "tonic 0.10.2",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tonic 0.12.1",
  "tonic-build",
  "tower",
  "tower-service",
@@ -5185,7 +5186,7 @@ dependencies = [
  "thiserror",
  "time",
  "tokio",
- "tokio-stream",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
  "url",
  "yup-oauth2",
 ]
@@ -5301,9 +5302,9 @@ dependencies = [
 
 [[package]]
 name = "google-cloud-auth"
-version = "0.15.0"
+version = "0.16.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e09ed5b2998bc8d0d3df09c859028210d4961b8fe779cfda8dc8ca4e83d5def2"
+checksum = "1112c453c2e155b3e683204ffff52bcc6d6495d04b68d9e90cd24161270c5058"
 dependencies = [
  "async-trait",
  "base64 0.21.7",
@@ -5323,9 +5324,9 @@ dependencies = [
 
 [[package]]
 name = "google-cloud-bigquery"
-version = "0.9.0"
+version = "0.12.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e321c127945bb44a5cf5129c37530e2494b97afefe7f334a983ac754e40914e"
+checksum = "305cb7214d11b719e9f00f982c1ee1304c674f7a8dfc44a43b8bad3c909750c2"
 dependencies = [
  "anyhow",
  "arrow 50.0.0",
@@ -5350,29 +5351,29 @@ dependencies = [
 
 [[package]]
 name = "google-cloud-gax"
-version = "0.17.0"
+version = "0.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8cb60314136e37de9e2a05ddb427b9c5a39c3d188de2e2f026c6af74425eef44"
+checksum = "9c3eaaad103912825594d674a4b1e556ccbb05a13a6cac17dcfd871997fb760a"
 dependencies = [
  "google-cloud-token",
- "http 0.2.9",
+ "http 1.1.0",
  "thiserror",
  "tokio",
  "tokio-retry",
- "tonic 0.10.2",
+ "tonic 0.12.1",
  "tower",
  "tracing",
 ]
 
 [[package]]
 name = "google-cloud-googleapis"
-version = "0.13.0"
+version = "0.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32cd184c52aa2619ac1b16ad8b5a752e91d25be88a8cf08eaec19777dfacbe54"
+checksum = "0ae8ab26ef7c7c3f7dfb9cc3982293d031d8e78c85d00ddfb704b5c35aeff7c8"
 dependencies = [
- "prost 0.12.1",
- "prost-types 0.12.1",
- "tonic 0.10.2",
+ "prost 0.13.1",
+ "prost-types 0.13.1",
+ "tonic 0.12.1",
 ]
 
 [[package]]
@@ -5388,9 +5389,9 @@ dependencies = [
 
 [[package]]
 name = "google-cloud-pubsub"
-version = "0.25.0"
+version = "0.28.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a35e4a008db5cf01a5c03d3c67bd90b3cad77427ca949f3c8eddd90c4a3c932"
+checksum = "55ef73601dcec5ea144e59969e921d35d66000211603fee8023b7947af09248f"
 dependencies = [
  "async-channel 1.9.0",
  "async-stream",
@@ -5398,7 +5399,7 @@ dependencies = [
  "google-cloud-gax",
  "google-cloud-googleapis",
  "google-cloud-token",
- "prost-types 0.12.1",
+ "prost-types 0.13.1",
  "thiserror",
  "tokio",
  "tokio-util",
@@ -5407,9 +5408,9 @@ dependencies = [
 
 [[package]]
 name = "google-cloud-token"
-version = "0.1.1"
+version = "0.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0fcd62eb34e3de2f085bcc33a09c3e17c4f65650f36d53eb328b00d63bcb536a"
+checksum = "8f49c12ba8b21d128a2ce8585955246977fbce4415f680ebf9199b6f9d6d725f"
 dependencies = [
  "async-trait",
 ]
@@ -5595,9 +5596,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
 
 [[package]]
 name = "hermit-abi"
-version = "0.3.2"
+version = "0.3.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
+checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
 
 [[package]]
 name = "hex"
@@ -5760,9 +5761,9 @@ dependencies = [
 
 [[package]]
 name = "hyper"
-version = "1.1.0"
+version = "1.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75"
+checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
 dependencies = [
  "bytes",
  "futures-channel",
@@ -5774,6 +5775,7 @@ dependencies = [
  "httpdate",
  "itoa",
  "pin-project-lite",
+ "smallvec",
  "tokio",
  "want",
 ]
@@ -5803,7 +5805,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
 dependencies = [
  "futures-util",
  "http 1.1.0",
- "hyper 1.1.0",
+ "hyper 1.4.1",
  "hyper-util",
  "rustls 0.22.4",
  "rustls-pki-types",
@@ -5824,6 +5826,19 @@ dependencies = [
  "tokio-io-timeout",
 ]
 
+[[package]]
+name = "hyper-timeout"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
+dependencies = [
+ "hyper 1.4.1",
+ "hyper-util",
+ "pin-project-lite",
+ "tokio",
+ "tower-service",
+]
+
 [[package]]
 name = "hyper-tls"
 version = "0.5.0"
@@ -5845,7 +5860,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
 dependencies = [
  "bytes",
  "http-body-util",
- "hyper 1.1.0",
+ "hyper 1.4.1",
  "hyper-util",
  "native-tls",
  "tokio",
@@ -5855,16 +5870,16 @@ dependencies = [
 
 [[package]]
 name = "hyper-util"
-version = "0.1.3"
+version = "0.1.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa"
+checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956"
 dependencies = [
  "bytes",
  "futures-channel",
  "futures-util",
  "http 1.1.0",
  "http-body 1.0.0",
- "hyper 1.1.0",
+ "hyper 1.4.1",
  "pin-project-lite",
  "socket2 0.5.6",
  "tokio",
@@ -6814,13 +6829,13 @@ dependencies = [
 
 [[package]]
 name = "madsim-etcd-client"
-version = "0.4.0+0.12.1"
+version = "0.6.0+0.14.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "02b4b5de48bb7f3f7eae0bca62b3ed0b7d714b1b273d7347329b92c3a2eef113"
+checksum = "8edcf23498cb590e415ce2ba6c7f186c7aa3340e7aa716ddddb34faf0a9ffdfb"
 dependencies = [
  "etcd-client",
  "futures-util",
- "http 0.2.9",
+ "http 1.1.0",
  "madsim",
  "serde",
  "serde_with 3.8.0",
@@ -6828,7 +6843,7 @@ dependencies = [
  "thiserror",
  "tokio",
  "toml 0.8.12",
- "tonic 0.10.2",
+ "tonic 0.12.1",
  "tracing",
 ]
 
@@ -6881,29 +6896,29 @@ dependencies = [
 
 [[package]]
 name = "madsim-tonic"
-version = "0.4.1+0.10.0"
+version = "0.5.1+0.12.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "813977c7870103e113a0332d97731f961bc48aaa8860edd318ef7d7754214436"
+checksum = "61c668c82f0c2aca7ffed3235047f2539e6e41278c7c47a822999f3b7a067887"
 dependencies = [
  "async-stream",
  "chrono",
  "futures-util",
  "madsim",
  "tokio",
- "tonic 0.10.2",
+ "tonic 0.12.1",
  "tower",
  "tracing",
 ]
 
 [[package]]
 name = "madsim-tonic-build"
-version = "0.4.2+0.10.0"
+version = "0.5.0+0.12.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4a2ad2776ba20221ccbe4e136e2fa0f7ab90eebd608373177f3e74a198a288ec"
+checksum = "f271a476bbaa9d2139e1e1a5beb869c6119e805a0b67ad2b2857e4a8785b111a"
 dependencies = [
  "prettyplease 0.2.15",
  "proc-macro2",
- "prost-build 0.12.1",
+ "prost-build 0.13.1",
  "quote",
  "syn 2.0.66",
  "tonic-build",
@@ -7932,7 +7947,7 @@ dependencies = [
  "rand",
  "thiserror",
  "tokio",
- "tokio-stream",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
@@ -7985,20 +8000,20 @@ dependencies = [
 [[package]]
 name = "otlp-embedded"
 version = "0.0.1"
-source = "git+https://github.com/risingwavelabs/otlp-embedded?rev=492c244e0be91feb659c0cd48a624bbd96045a33#492c244e0be91feb659c0cd48a624bbd96045a33"
+source = "git+https://github.com/risingwavelabs/otlp-embedded?rev=e6cd165b9bc85783b42c106e99186b86b73e3507#e6cd165b9bc85783b42c106e99186b86b73e3507"
 dependencies = [
  "axum 0.7.4",
  "datasize",
  "hex",
- "itertools 0.12.1",
- "madsim-tonic",
- "madsim-tonic-build",
- "prost 0.12.1",
+ "itertools 0.13.0",
+ "prost 0.13.1",
  "rust-embed",
  "schnellru",
  "serde",
  "serde_json",
  "tokio",
+ "tonic 0.12.1",
+ "tonic-build",
  "tracing",
 ]
 
@@ -8935,6 +8950,16 @@ dependencies = [
  "prost-derive 0.12.1",
 ]
 
+[[package]]
+name = "prost"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc"
+dependencies = [
+ "bytes",
+ "prost-derive 0.13.1",
+]
+
 [[package]]
 name = "prost-build"
 version = "0.11.9"
@@ -8979,6 +9004,27 @@ dependencies = [
  "which",
 ]
 
+[[package]]
+name = "prost-build"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1"
+dependencies = [
+ "bytes",
+ "heck 0.5.0",
+ "itertools 0.13.0",
+ "log",
+ "multimap 0.10.0",
+ "once_cell",
+ "petgraph",
+ "prettyplease 0.2.15",
+ "prost 0.13.1",
+ "prost-types 0.13.1",
+ "regex",
+ "syn 2.0.66",
+ "tempfile",
+]
+
 [[package]]
 name = "prost-derive"
 version = "0.11.9"
@@ -9005,6 +9051,19 @@ dependencies = [
  "syn 2.0.66",
 ]
 
+[[package]]
+name = "prost-derive"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca"
+dependencies = [
+ "anyhow",
+ "itertools 0.13.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.66",
+]
+
 [[package]]
 name = "prost-helpers"
 version = "0.1.0"
@@ -9016,13 +9075,13 @@ dependencies = [
 
 [[package]]
 name = "prost-reflect"
-version = "0.13.0"
+version = "0.14.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9ae9372e3227f3685376a0836e5c248611eafc95a0be900d44bc6cdf225b700f"
+checksum = "55a6a9143ae25c25fa7b6a48d6cc08b10785372060009c25140a4e7c340e95af"
 dependencies = [
  "once_cell",
- "prost 0.12.1",
- "prost-types 0.12.1",
+ "prost 0.13.1",
+ "prost-types 0.13.1",
 ]
 
 [[package]]
@@ -9043,6 +9102,15 @@ dependencies = [
  "prost 0.12.1",
 ]
 
+[[package]]
+name = "prost-types"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2"
+dependencies = [
+ "prost 0.13.1",
+]
+
 [[package]]
 name = "protobuf"
 version = "2.28.0"
@@ -9160,7 +9228,7 @@ dependencies = [
  "indoc",
  "libc",
  "memoffset",
- "parking_lot 0.11.2",
+ "parking_lot 0.12.1",
  "portable-atomic",
  "pyo3-build-config",
  "pyo3-ffi",
@@ -9623,7 +9691,7 @@ dependencies = [
  "http 1.1.0",
  "http-body 1.0.0",
  "http-body-util",
- "hyper 1.1.0",
+ "hyper 1.4.1",
  "hyper-rustls 0.26.0",
  "hyper-tls 0.6.0",
  "hyper-util",
@@ -9813,7 +9881,7 @@ dependencies = [
  "bytes",
  "itertools 0.12.1",
  "parking_lot 0.12.1",
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_common",
  "risingwave_hummock_sdk",
  "risingwave_meta_model_v2",
@@ -9855,7 +9923,7 @@ dependencies = [
  "parquet 52.0.0",
  "paste",
  "prometheus",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "risingwave_common",
  "risingwave_common_estimate_size",
@@ -9875,7 +9943,7 @@ dependencies = [
  "thiserror-ext",
  "tikv-jemallocator",
  "tokio-metrics",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tokio-util",
  "tracing",
  "twox-hash",
@@ -9918,7 +9986,7 @@ dependencies = [
  "serde",
  "serde_yaml",
  "thiserror-ext",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "toml 0.8.12",
  "tracing",
  "tracing-subscriber",
@@ -10024,7 +10092,7 @@ dependencies = [
  "governor",
  "hashbrown 0.14.3",
  "hex",
- "http 0.2.9",
+ "http 1.1.0",
  "http-body 0.4.5",
  "humantime",
  "hytra",
@@ -10050,7 +10118,7 @@ dependencies = [
  "pretty_assertions",
  "procfs 0.16.0",
  "prometheus",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "regex",
  "reqwest 0.12.4",
@@ -10132,14 +10200,18 @@ dependencies = [
 name = "risingwave_common_metrics"
 version = "1.11.0-alpha"
 dependencies = [
+ "auto_impl",
  "bytes",
  "clap",
  "darwin-libproc",
  "easy-ext",
  "futures",
  "http 0.2.9",
- "http-body 0.4.5",
+ "http 1.1.0",
+ "http-body 1.0.0",
  "hyper 0.14.27",
+ "hyper 1.4.1",
+ "hyper-util",
  "hytra",
  "itertools 0.12.1",
  "libc",
@@ -10180,7 +10252,7 @@ dependencies = [
  "anyhow",
  "bincode 1.3.3",
  "parking_lot 0.12.1",
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_pb",
  "serde",
  "thiserror",
@@ -10195,7 +10267,7 @@ dependencies = [
  "async-trait",
  "axum 0.7.4",
  "futures",
- "hyper 0.14.27",
+ "http 1.1.0",
  "madsim-tokio",
  "madsim-tonic",
  "prometheus",
@@ -10249,7 +10321,7 @@ dependencies = [
  "madsim-tokio",
  "madsim-tonic",
  "parking_lot 0.12.1",
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_common",
  "risingwave_common_heap_profiling",
  "risingwave_common_service",
@@ -10275,14 +10347,15 @@ dependencies = [
  "foyer",
  "futures",
  "futures-async-stream",
- "hyper 0.14.27",
+ "http 1.1.0",
+ "hyper 1.4.1",
  "itertools 0.12.1",
  "madsim-tokio",
  "madsim-tonic",
  "maplit",
  "pprof",
  "prometheus",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "risingwave_batch",
  "risingwave_common",
@@ -10301,7 +10374,7 @@ dependencies = [
  "tempfile",
  "thiserror-ext",
  "tikv-jemalloc-ctl",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tower",
  "tracing",
  "uuid",
@@ -10388,10 +10461,10 @@ dependencies = [
  "postgres-openssl",
  "pretty_assertions",
  "prometheus",
- "prost 0.12.1",
+ "prost 0.13.1",
  "prost-build 0.12.1",
  "prost-reflect",
- "prost-types 0.12.1",
+ "prost-types 0.13.1",
  "protobuf-native",
  "protobuf-src",
  "pulsar",
@@ -10431,7 +10504,7 @@ dependencies = [
  "time",
  "tokio-postgres",
  "tokio-retry",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tokio-util",
  "tracing",
  "tracing-subscriber",
@@ -10489,7 +10562,7 @@ dependencies = [
  "madsim-tokio",
  "madsim-tonic",
  "memcomparable",
- "prost 0.12.1",
+ "prost 0.13.1",
  "regex",
  "risingwave_common",
  "risingwave_connector",
@@ -10729,7 +10802,7 @@ dependencies = [
  "pretty-xmlish",
  "pretty_assertions",
  "prometheus",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "risingwave_batch",
  "risingwave_common",
@@ -10756,7 +10829,7 @@ dependencies = [
  "tempfile",
  "thiserror",
  "thiserror-ext",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tracing",
  "uuid",
  "workspace-hack",
@@ -10781,7 +10854,7 @@ dependencies = [
  "hex",
  "itertools 0.12.1",
  "parse-display",
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_common",
  "risingwave_common_estimate_size",
  "risingwave_pb",
@@ -10839,7 +10912,7 @@ dependencies = [
  "madsim-tokio",
  "mockall",
  "parking_lot 0.12.1",
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_common",
  "risingwave_hummock_sdk",
  "risingwave_pb",
@@ -10858,7 +10931,7 @@ dependencies = [
  "futures",
  "jni",
  "madsim-tokio",
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_common",
  "risingwave_expr",
  "risingwave_hummock_sdk",
@@ -10888,7 +10961,7 @@ dependencies = [
  "jni",
  "madsim-tokio",
  "paste",
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_common",
  "risingwave_expr",
  "risingwave_hummock_sdk",
@@ -10955,7 +11028,7 @@ dependencies = [
  "function_name",
  "futures",
  "hex",
- "hyper 0.14.27",
+ "http 1.1.0",
  "itertools 0.12.1",
  "jsonbb",
  "madsim-etcd-client",
@@ -10970,7 +11043,7 @@ dependencies = [
  "parking_lot 0.12.1",
  "prometheus",
  "prometheus-http-query",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "risingwave_backup",
  "risingwave_common",
@@ -10997,7 +11070,7 @@ dependencies = [
  "thiserror",
  "thiserror-ext",
  "tokio-retry",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tower",
  "tower-http",
  "tracing",
@@ -11043,7 +11116,7 @@ dependencies = [
 name = "risingwave_meta_model_v2"
 version = "1.11.0-alpha"
 dependencies = [
- "prost 0.12.1",
+ "prost 0.13.1",
  "risingwave_common",
  "risingwave_hummock_sdk",
  "risingwave_pb",
@@ -11097,7 +11170,7 @@ dependencies = [
  "itertools 0.12.1",
  "madsim-tokio",
  "madsim-tonic",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "regex",
  "risingwave_common",
@@ -11110,7 +11183,7 @@ dependencies = [
  "serde_json",
  "sync-point",
  "thiserror-ext",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tracing",
  "workspace-hack",
 ]
@@ -11162,8 +11235,8 @@ dependencies = [
  "madsim-tonic-build",
  "pbjson",
  "pbjson-build",
- "prost 0.12.1",
- "prost-build 0.12.1",
+ "prost 0.13.1",
+ "prost-build 0.13.1",
  "prost-helpers",
  "risingwave_error",
  "serde",
@@ -11218,8 +11291,8 @@ dependencies = [
  "easy-ext",
  "either",
  "futures",
- "http 0.2.9",
- "hyper 0.14.27",
+ "http 1.1.0",
+ "hyper 1.4.1",
  "itertools 0.12.1",
  "lru 0.7.6",
  "madsim-tokio",
@@ -11236,7 +11309,7 @@ dependencies = [
  "thiserror",
  "thiserror-ext",
  "tokio-retry",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tower",
  "tracing",
  "url",
@@ -11322,7 +11395,7 @@ dependencies = [
  "tempfile",
  "tikv-jemallocator",
  "tokio-postgres",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tracing",
  "tracing-subscriber",
 ]
@@ -11390,7 +11463,7 @@ dependencies = [
  "serde",
  "serde_with 3.8.0",
  "tokio-postgres",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "toml 0.8.12",
  "tracing",
  "workspace-hack",
@@ -11437,7 +11510,7 @@ dependencies = [
  "parking_lot 0.12.1",
  "procfs 0.16.0",
  "prometheus",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "risingwave_backup",
  "risingwave_common",
@@ -11506,7 +11579,7 @@ dependencies = [
  "pin-project",
  "prehash",
  "prometheus",
- "prost 0.12.1",
+ "prost 0.13.1",
  "rand",
  "risingwave_common",
  "risingwave_common_estimate_size",
@@ -11531,7 +11604,7 @@ dependencies = [
  "thiserror-ext",
  "tokio-metrics",
  "tokio-retry",
- "tokio-stream",
+ "tokio-stream 0.1.15 (git+https://github.com/madsim-rs/tokio.git?rev=0dd1055)",
  "tracing",
  "tracing-test",
  "workspace-hack",
@@ -12077,9 +12150,9 @@ dependencies = [
 
 [[package]]
 name = "schnellru"
-version = "0.2.1"
+version = "0.2.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d"
+checksum = "c9a8ef13a93c54d20580de1e5c413e624e53121d42fc7e2c11d10ef7f8b02367"
 dependencies = [
  "ahash 0.8.11",
  "cfg-if",
@@ -13122,7 +13195,7 @@ dependencies = [
  "thiserror",
  "time",
  "tokio",
- "tokio-stream",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
  "tracing",
  "url",
  "uuid",
@@ -13876,7 +13949,7 @@ dependencies = [
  "futures-util",
  "pin-project-lite",
  "tokio",
- "tokio-stream",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
@@ -13970,8 +14043,19 @@ dependencies = [
 
 [[package]]
 name = "tokio-stream"
-version = "0.1.14"
-source = "git+https://github.com/madsim-rs/tokio.git?rev=fe39bb8e#fe39bb8e8ab0ed96ee1b4477ab5508c20ce017fb"
+version = "0.1.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
+dependencies = [
+ "futures-core",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
+name = "tokio-stream"
+version = "0.1.15"
+source = "git+https://github.com/madsim-rs/tokio.git?rev=0dd1055#0dd105567b323c863c29f794d2221ed588956d8d"
 dependencies = [
  "futures-core",
  "madsim-tokio",
@@ -14074,12 +14158,11 @@ dependencies = [
  "axum 0.6.20",
  "base64 0.21.7",
  "bytes",
- "flate2",
  "h2 0.3.26",
  "http 0.2.9",
  "http-body 0.4.5",
  "hyper 0.14.27",
- "hyper-timeout",
+ "hyper-timeout 0.4.1",
  "percent-encoding",
  "pin-project",
  "prost 0.12.1",
@@ -14087,12 +14170,11 @@ dependencies = [
  "rustls-pemfile 1.0.4",
  "tokio",
  "tokio-rustls 0.24.1",
- "tokio-stream",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
  "tower",
  "tower-layer",
  "tower-service",
  "tracing",
- "webpki-roots 0.25.2",
 ]
 
 [[package]]
@@ -14110,27 +14192,61 @@ dependencies = [
  "http 0.2.9",
  "http-body 0.4.5",
  "hyper 0.14.27",
- "hyper-timeout",
+ "hyper-timeout 0.4.1",
  "percent-encoding",
  "pin-project",
  "prost 0.12.1",
  "tokio",
- "tokio-stream",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
  "tower",
  "tower-layer",
  "tower-service",
  "tracing",
 ]
 
+[[package]]
+name = "tonic"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401"
+dependencies = [
+ "async-stream",
+ "async-trait",
+ "axum 0.7.4",
+ "base64 0.22.0",
+ "bytes",
+ "flate2",
+ "h2 0.4.4",
+ "http 1.1.0",
+ "http-body 1.0.0",
+ "http-body-util",
+ "hyper 1.4.1",
+ "hyper-timeout 0.5.1",
+ "hyper-util",
+ "percent-encoding",
+ "pin-project",
+ "prost 0.13.1",
+ "rustls-pemfile 2.1.1",
+ "socket2 0.5.6",
+ "tokio",
+ "tokio-rustls 0.26.0",
+ "tokio-stream 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tower",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+ "webpki-roots 0.26.1",
+]
+
 [[package]]
 name = "tonic-build"
-version = "0.10.2"
+version = "0.12.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889"
+checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964"
 dependencies = [
  "prettyplease 0.2.15",
  "proc-macro2",
- "prost-build 0.12.1",
+ "prost-build 0.13.1",
  "quote",
  "syn 2.0.66",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index 9b07142dcf021..5bfab4feb27fb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -121,7 +121,7 @@ aws-smithy-types = { version = "1", default-features = false, features = [
 aws-endpoint = "0.60"
 aws-types = "1"
 axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
-etcd-client = { package = "madsim-etcd-client", version = "0.4" }
+etcd-client = { package = "madsim-etcd-client", version = "0.6" }
 futures-async-stream = "0.2.9"
 hytra = "0.1"
 rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
@@ -129,11 +129,11 @@ rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
 ] }
 hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
 criterion = { version = "0.5", features = ["async_futures"] }
-tonic = { package = "madsim-tonic", version = "0.4.1" }
-tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
-otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "492c244e0be91feb659c0cd48a624bbd96045a33" }
-prost = { version = "0.12" }
-prost-build = { version = "0.12" }
+tonic = { package = "madsim-tonic", version = "0.5.1" }
+tonic-build = { package = "madsim-tonic-build", version = "0.5" }
+otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" }
+prost = { version = "0.13" }
+prost-build = { version = "0.13" }
 icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [
     "prometheus",
 ] }
@@ -180,6 +180,7 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git"
     "profiling",
     "stats",
 ], rev = "64a2d9" }
+# TODO(http-bump): bump to use tonic 0.12 once minitrace-opentelemetry is updated
 opentelemetry = "0.23"
 opentelemetry-otlp = "0.16"
 opentelemetry_sdk = { version = "0.23", default-features = false }
@@ -195,6 +196,7 @@ sea-orm = { version = "0.12.14", features = [
     "runtime-tokio-native-tls",
 ] }
 sqlx = "0.7"
+tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = ["net", "fs"] }
 tokio-util = "0.7"
 tracing-opentelemetry = "0.24"
 rand = { version = "0.8", features = ["small_rng"] }
@@ -335,7 +337,9 @@ opt-level = 2
 # Patch third-party crates for deterministic simulation.
 quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
 getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
-tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" }
+# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
+# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
+# tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055" }
 tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
 tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" }
 futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml
index b230e0381d147..4b1954ff5ae2c 100644
--- a/ci/docker-compose.yml
+++ b/ci/docker-compose.yml
@@ -266,7 +266,6 @@ services:
       SCHEMA_REGISTRY_HOST_NAME: schemaregistry
       SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082
       SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092
-      SCHEMA_REGISTRY_DEBUG: 'true'
 
   pulsar-server:
     container_name: pulsar-server
diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt
index 0abd242e3c79d..5f032ba32f8dc 100644
--- a/e2e_test/sink/kafka/protobuf.slt
+++ b/e2e_test/sink/kafka/protobuf.slt
@@ -201,25 +201,19 @@ format plain encode protobuf (
   message = 'recursive.AllTypes');
 
 statement ok
-drop sink sink_upsert;
+drop table from_kafka cascade;
 
 statement ok
-drop sink sink_csr_nested;
+drop table from_kafka_csr_trivial cascade;
 
 statement ok
-drop sink sink_csr_trivial;
+drop table from_kafka_csr_nested cascade;
 
 statement ok
-drop sink sink0;
+drop table from_kafka_raw cascade;
 
 statement ok
-drop table into_kafka;
-
-statement ok
-drop table from_kafka_raw;
+drop table into_kafka cascade;
 
 system ok
 rpk topic delete test-rw-sink-upsert-protobuf
-
-statement ok
-drop table from_kafka;
diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml
index 099ae9019afcf..403eb864229d3 100644
--- a/src/batch/Cargo.toml
+++ b/src/batch/Cargo.toml
@@ -63,7 +63,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
     "fs",
 ] }
 tokio-metrics = "0.3.0"
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 tokio-util = { workspace = true }
 tonic = { workspace = true }
 tracing = "0.1"
diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs
index d69d4fbc8b174..00073217f7ead 100644
--- a/src/batch/src/executor/hash_agg.rs
+++ b/src/batch/src/executor/hash_agg.rs
@@ -20,7 +20,6 @@ use bytes::Bytes;
 use futures_async_stream::try_stream;
 use hashbrown::hash_map::Entry;
 use itertools::Itertools;
-use prost::Message;
 use risingwave_common::array::{DataChunk, StreamChunk};
 use risingwave_common::bitmap::Bitmap;
 use risingwave_common::catalog::{Field, Schema};
@@ -35,6 +34,7 @@ use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction
 use risingwave_pb::batch_plan::plan_node::NodeBody;
 use risingwave_pb::batch_plan::HashAggNode;
 use risingwave_pb::data::DataChunk as PbDataChunk;
+use risingwave_pb::Message;
 
 use crate::error::{BatchError, Result};
 use crate::executor::aggregation::build as build_agg;
diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs
index f5ad5ab5ed984..1068ffd7f3349 100644
--- a/src/batch/src/executor/join/distributed_lookup_join.rs
+++ b/src/batch/src/executor/join/distributed_lookup_join.rs
@@ -354,10 +354,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
         let pk_prefix = OwnedRow::new(scan_range.eq_conds);
 
         if self.lookup_prefix_len == self.table.pk_indices().len() {
-            let row = self
-                .table
-                .get_row(&pk_prefix, self.epoch.clone().into())
-                .await?;
+            let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;
 
             if let Some(row) = row {
                 self.row_list.push(row);
@@ -366,7 +363,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
             let iter = self
                 .table
                 .batch_iter_with_pk_bounds(
-                    self.epoch.clone().into(),
+                    self.epoch.into(),
                     &pk_prefix,
                     ..,
                     false,
diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs
index 026f03fb65deb..3bfb583d6459d 100644
--- a/src/batch/src/executor/join/hash_join.rs
+++ b/src/batch/src/executor/join/hash_join.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
 use bytes::Bytes;
 use futures_async_stream::try_stream;
 use itertools::Itertools;
-use prost::Message;
 use risingwave_common::array::{Array, DataChunk, RowRef};
 use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
 use risingwave_common::catalog::Schema;
@@ -34,6 +33,7 @@ use risingwave_common_estimate_size::EstimateSize;
 use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
 use risingwave_pb::batch_plan::plan_node::NodeBody;
 use risingwave_pb::data::DataChunk as PbDataChunk;
+use risingwave_pb::Message;
 
 use super::{ChunkedData, JoinType, RowId};
 use crate::error::{BatchError, Result};
diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs
index 7fcaba71a9c3b..a3be00fc39a22 100644
--- a/src/batch/src/executor/join/local_lookup_join.rs
+++ b/src/batch/src/executor/join/local_lookup_join.rs
@@ -134,7 +134,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
                     ..Default::default()
                 }),
             }),
-            epoch: Some(self.epoch.clone()),
+            epoch: Some(self.epoch),
             tracing_context: TracingContext::from_current_span().to_protobuf(),
         };
 
@@ -237,7 +237,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
             &plan_node,
             &task_id,
             self.context.clone(),
-            self.epoch.clone(),
+            self.epoch,
             self.shutdown_rx.clone(),
         );
 
diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs
index 3a64901c64a04..80dc57b4f3620 100644
--- a/src/batch/src/executor/mod.rs
+++ b/src/batch/src/executor/mod.rs
@@ -174,7 +174,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
             plan_node,
             self.task_id,
             self.context.clone(),
-            self.epoch.clone(),
+            self.epoch,
             self.shutdown_rx.clone(),
         )
     }
@@ -188,7 +188,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
     }
 
     pub fn epoch(&self) -> BatchQueryEpoch {
-        self.epoch.clone()
+        self.epoch
     }
 }
 
diff --git a/src/batch/src/executor/order_by.rs b/src/batch/src/executor/order_by.rs
index 3f8c8e106c78f..ad7cc13992346 100644
--- a/src/batch/src/executor/order_by.rs
+++ b/src/batch/src/executor/order_by.rs
@@ -17,7 +17,6 @@ use std::sync::Arc;
 use bytes::Bytes;
 use futures_async_stream::try_stream;
 use itertools::Itertools;
-use prost::Message;
 use risingwave_common::array::DataChunk;
 use risingwave_common::catalog::Schema;
 use risingwave_common::memory::MemoryContext;
@@ -28,6 +27,7 @@ use risingwave_common::util::sort_util::ColumnOrder;
 use risingwave_common_estimate_size::EstimateSize;
 use risingwave_pb::batch_plan::plan_node::NodeBody;
 use risingwave_pb::data::DataChunk as PbDataChunk;
+use risingwave_pb::Message;
 
 use super::{
     BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs
index b8287147c6750..b897dbd813787 100644
--- a/src/batch/src/executor/row_seq_scan.rs
+++ b/src/batch/src/executor/row_seq_scan.rs
@@ -237,7 +237,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
 
         let ordered = seq_scan_node.ordered;
 
-        let epoch = source.epoch.clone();
+        let epoch = source.epoch;
         let limit = seq_scan_node.limit;
         let as_of = seq_scan_node
             .as_of
@@ -341,8 +341,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
         for point_get in point_gets {
             let table = table.clone();
             if let Some(row) =
-                Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone())
-                    .await?
+                Self::execute_point_get(table, point_get, query_epoch, histogram.clone()).await?
             {
                 if let Some(chunk) = data_chunk_builder.append_one_row(row) {
                     returned += chunk.cardinality() as u64;
@@ -373,7 +372,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
                 table.clone(),
                 range,
                 ordered,
-                query_epoch.clone(),
+                query_epoch,
                 chunk_size,
                 limit,
                 histogram.clone(),
diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs
index 237ee3baf0099..b3e842a269ec7 100644
--- a/src/batch/src/spill/spill_op.rs
+++ b/src/batch/src/spill/spill_op.rs
@@ -22,9 +22,9 @@ use futures_util::AsyncReadExt;
 use opendal::layers::RetryLayer;
 use opendal::services::{Fs, Memory};
 use opendal::Operator;
-use prost::Message;
 use risingwave_common::array::DataChunk;
 use risingwave_pb::data::DataChunk as PbDataChunk;
+use risingwave_pb::Message;
 use thiserror_ext::AsReport;
 use tokio::sync::Mutex;
 use twox_hash::XxHash64;
diff --git a/src/batch/src/task/broadcast_channel.rs b/src/batch/src/task/broadcast_channel.rs
index d66eda7d7d620..9781e38e7d7f6 100644
--- a/src/batch/src/task/broadcast_channel.rs
+++ b/src/batch/src/task/broadcast_channel.rs
@@ -86,7 +86,7 @@ pub fn new_broadcast_channel(
     output_channel_size: usize,
 ) -> (ChanSenderImpl, Vec<ChanReceiverImpl>) {
     let broadcast_info = match shuffle.distribution {
-        Some(exchange_info::Distribution::BroadcastInfo(ref v)) => v.clone(),
+        Some(exchange_info::Distribution::BroadcastInfo(ref v)) => *v,
         _ => BroadcastInfo::default(),
     };
 
diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs
index 4536dad1c031f..7186ced55febd 100644
--- a/src/batch/src/task/task_execution.rs
+++ b/src/batch/src/task/task_execution.rs
@@ -393,7 +393,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
                 self.plan.root.as_ref().unwrap(),
                 &self.task_id,
                 self.context.clone(),
-                self.epoch.clone(),
+                self.epoch,
                 self.shutdown_rx.clone(),
             )
             .build(),
diff --git a/src/bench/Cargo.toml b/src/bench/Cargo.toml
index d451ef46ef838..43451ebaeb9d1 100644
--- a/src/bench/Cargo.toml
+++ b/src/bench/Cargo.toml
@@ -50,7 +50,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
     "time",
     "signal",
 ] }
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 toml = "0.8"
 tracing = "0.1"
 tracing-subscriber = "0.3.17"
diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml
index a117dce645ae6..2cc1d81f1a38d 100644
--- a/src/common/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -55,7 +55,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
 governor = { version = "0.6", default-features = false, features = ["std"] }
 hashbrown = "0.14"
 hex = "0.4.3"
-http = "0.2"
+http = "1"
 humantime = "2.1"
 hytra = { workspace = true }
 itertools = { workspace = true }
diff --git a/src/common/common_service/Cargo.toml b/src/common/common_service/Cargo.toml
index cb43702f3f9e6..87206ab7cbc1d 100644
--- a/src/common/common_service/Cargo.toml
+++ b/src/common/common_service/Cargo.toml
@@ -18,7 +18,7 @@ normal = ["workspace-hack"]
 async-trait = "0.1"
 axum = { workspace = true }
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
-hyper = "0.14" # required by tonic
+http = "1"
 prometheus = { version = "0.13" }
 risingwave_common = { workspace = true }
 risingwave_pb = { workspace = true }
diff --git a/src/common/common_service/src/tracing.rs b/src/common/common_service/src/tracing.rs
index 3ee4a64231c29..de6f43bbf33f3 100644
--- a/src/common/common_service/src/tracing.rs
+++ b/src/common/common_service/src/tracing.rs
@@ -15,8 +15,8 @@
 use std::task::{Context, Poll};
 
 use futures::Future;
-use hyper::Body;
 use risingwave_common::util::tracing::TracingContext;
+use tonic::body::BoxBody;
 use tower::{Layer, Service};
 use tracing::Instrument;
 
@@ -49,9 +49,9 @@ pub struct TracingExtract<S> {
     inner: S,
 }
 
-impl<S> Service<hyper::Request<Body>> for TracingExtract<S>
+impl<S> Service<http::Request<BoxBody>> for TracingExtract<S>
 where
-    S: Service<hyper::Request<Body>> + Clone + Send + 'static,
+    S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
     S::Future: Send + 'static,
 {
     type Error = S::Error;
@@ -63,7 +63,7 @@ where
         self.inner.poll_ready(cx)
     }
 
-    fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
+    fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
         // This is necessary because tonic internally uses `tower::buffer::Buffer`.
         // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
         // for details on why this is necessary
diff --git a/src/common/metrics/Cargo.toml b/src/common/metrics/Cargo.toml
index 4f3e8b20936b2..0c32b557cebb2 100644
--- a/src/common/metrics/Cargo.toml
+++ b/src/common/metrics/Cargo.toml
@@ -15,12 +15,16 @@ ignored = ["workspace-hack"]
 normal = ["workspace-hack"]
 
 [dependencies]
+auto_impl = "1"
 bytes = "1"
 clap = { workspace = true }
 easy-ext = "1"
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
-http = "0.2"
-hyper = { version = "0.14", features = ["client"] }                           # used by tonic
+http = "1"
+http-02 = { package = "http", version = "0.2" }
+hyper = { version = "1" }
+hyper-014 = { package = "hyper", version = "0.14" }
+hyper-util = { version = "0.1", features = ["client-legacy"] }
 hytra = { workspace = true }
 itertools = { workspace = true }
 parking_lot = { workspace = true }
@@ -32,13 +36,13 @@ serde = { version = "1", features = ["derive"] }
 thiserror-ext = { workspace = true }
 tokio = { version = "0.2", package = "madsim-tokio" }
 tonic = { workspace = true }
+tower-layer = "0.3.2"
+tower-service = "0.3.2"
 tracing = "0.1"
 tracing-subscriber = "0.3.17"
 
 [target.'cfg(not(madsim))'.dependencies]
-http-body = "0.4.5"
-tower-layer = "0.3.2"
-tower-service = "0.3.2"
+http-body = "1"
 [target.'cfg(target_os = "linux")'.dependencies]
 procfs = { version = "0.16", default-features = false }
 libc = "0.2"
diff --git a/src/common/metrics/src/monitor/connection.rs b/src/common/metrics/src/monitor/connection.rs
index e5774a3f16d7d..aa7c8c8d4baa3 100644
--- a/src/common/metrics/src/monitor/connection.rs
+++ b/src/common/metrics/src/monitor/connection.rs
@@ -24,10 +24,9 @@ use std::time::Duration;
 
 use futures::FutureExt;
 use http::Uri;
-use hyper::client::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
-use hyper::client::connect::Connection;
-use hyper::client::HttpConnector;
-use hyper::service::Service;
+use hyper_util::client::legacy::connect::dns::{GaiAddrs, GaiFuture, GaiResolver, Name};
+use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
+use hyper_util::rt::TokioIo;
 use itertools::Itertools;
 use pin_project_lite::pin_project;
 use prometheus::{
@@ -37,11 +36,13 @@ use prometheus::{
 use thiserror_ext::AsReport;
 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
 use tonic::transport::{Channel, Endpoint};
+use tower_service::Service;
 use tracing::{debug, info, warn};
 
 use crate::monitor::GLOBAL_METRICS_REGISTRY;
 use crate::{register_guarded_int_counter_vec_with_registry, LabelGuardedIntCounterVec};
 
+#[auto_impl::auto_impl(&mut)]
 pub trait MonitorAsyncReadWrite {
     fn on_read(&mut self, _size: usize) {}
     fn on_eof(&mut self) {}
@@ -74,6 +75,14 @@ impl<C, M> MonitoredConnection<C, M> {
         let this = this.project();
         (this.inner, this.monitor)
     }
+
+    /// Delegate async read/write traits between tokio and hyper.
+    fn hyper_tokio_delegate(
+        self: Pin<&mut Self>,
+    ) -> TokioIo<MonitoredConnection<TokioIo<Pin<&mut C>>, &mut M>> {
+        let (inner, monitor) = MonitoredConnection::project_into(self);
+        TokioIo::new(MonitoredConnection::new(TokioIo::new(inner), monitor))
+    }
 }
 
 impl<C: AsyncRead, M: MonitorAsyncReadWrite> AsyncRead for MonitoredConnection<C, M> {
@@ -112,6 +121,16 @@ impl<C: AsyncRead, M: MonitorAsyncReadWrite> AsyncRead for MonitoredConnection<C
     }
 }
 
+impl<C: hyper::rt::Read, M: MonitorAsyncReadWrite> hyper::rt::Read for MonitoredConnection<C, M> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: hyper::rt::ReadBufCursor<'_>,
+    ) -> Poll<Result<(), std::io::Error>> {
+        hyper::rt::Read::poll_read(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
+    }
+}
+
 impl<C: AsyncWrite, M: MonitorAsyncReadWrite> AsyncWrite for MonitoredConnection<C, M> {
     fn poll_write(
         self: Pin<&mut Self>,
@@ -186,8 +205,41 @@ impl<C: AsyncWrite, M: MonitorAsyncReadWrite> AsyncWrite for MonitoredConnection
     }
 }
 
+impl<C: hyper::rt::Write, M: MonitorAsyncReadWrite> hyper::rt::Write for MonitoredConnection<C, M> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<Result<usize, std::io::Error>> {
+        hyper::rt::Write::poll_write(std::pin::pin!(self.hyper_tokio_delegate()), cx, buf)
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
+        hyper::rt::Write::poll_flush(std::pin::pin!(self.hyper_tokio_delegate()), cx)
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), std::io::Error>> {
+        hyper::rt::Write::poll_shutdown(std::pin::pin!(self.hyper_tokio_delegate()), cx)
+    }
+
+    fn is_write_vectored(&self) -> bool {
+        self.inner.is_write_vectored()
+    }
+
+    fn poll_write_vectored(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        bufs: &[std::io::IoSlice<'_>],
+    ) -> Poll<Result<usize, std::io::Error>> {
+        hyper::rt::Write::poll_write_vectored(std::pin::pin!(self.hyper_tokio_delegate()), cx, bufs)
+    }
+}
+
 impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
-    fn connected(&self) -> hyper::client::connect::Connected {
+    fn connected(&self) -> Connected {
         self.inner.connected()
     }
 }
@@ -275,6 +327,58 @@ where
     }
 }
 
+// Compatibility implementation for hyper 0.14 ecosystem.
+// Should be the same as those with imports from `http::Uri` and `hyper_util::client::legacy`.
+// TODO(http-bump): remove this after there is no more dependency on hyper 0.14.
+mod compat {
+    use http_02::Uri;
+    use hyper_014::client::connect::{Connected, Connection};
+
+    use super::*;
+
+    impl<C: Service<Uri>, M: MonitorNewConnection + Clone + 'static> Service<Uri>
+        for MonitoredConnection<C, M>
+    where
+        C::Future: 'static,
+    {
+        type Error = C::Error;
+        type Response = MonitoredConnection<C::Response, M::ConnectionMonitor>;
+
+        type Future = impl Future<Output = Result<Self::Response, Self::Error>> + 'static;
+
+        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+            let ret = self.inner.poll_ready(cx);
+            if let Poll::Ready(Err(_)) = &ret {
+                self.monitor.on_err("<poll_ready>".to_string());
+            }
+            ret
+        }
+
+        fn call(&mut self, uri: Uri) -> Self::Future {
+            let endpoint = format!("{:?}", uri.host());
+            let monitor = self.monitor.clone();
+            self.inner
+                .call(uri)
+                .map(move |result: Result<_, _>| match result {
+                    Ok(resp) => Ok(MonitoredConnection::new(
+                        resp,
+                        monitor.new_connection_monitor(endpoint),
+                    )),
+                    Err(e) => {
+                        monitor.on_err(endpoint);
+                        Err(e)
+                    }
+                })
+        }
+    }
+
+    impl<C: Connection, M> Connection for MonitoredConnection<C, M> {
+        fn connected(&self) -> Connected {
+            self.inner.connected()
+        }
+    }
+}
+
 #[derive(Clone)]
 pub struct ConnectionMetrics {
     connection_count: IntGaugeVec,
@@ -534,18 +638,16 @@ impl<L> tonic::transport::server::Router<L> {
         signal: impl Future<Output = ()>,
     ) -> impl Future<Output = ()>
     where
-        L: tower_layer::Layer<tonic::transport::server::Routes>,
-        L::Service: Service<
-                http::request::Request<hyper::Body>,
-                Response = http::response::Response<ResBody>,
-            > + Clone
+        L: tower_layer::Layer<tonic::service::Routes>,
+        L::Service: Service<http::Request<tonic::body::BoxBody>, Response = http::Response<ResBody>>
+            + Clone
             + Send
             + 'static,
-        <<L as tower_layer::Layer<tonic::transport::server::Routes>>::Service as Service<
-            http::request::Request<hyper::Body>,
+        <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
+            http::Request<tonic::body::BoxBody>,
         >>::Future: Send + 'static,
-        <<L as tower_layer::Layer<tonic::transport::server::Routes>>::Service as Service<
-            http::request::Request<hyper::Body>,
+        <<L as tower_layer::Layer<tonic::service::Routes>>::Service as Service<
+            http::Request<tonic::body::BoxBody>,
         >>::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
         ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
         ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs
index aea08a1b74352..328287708a8a2 100644
--- a/src/common/src/vnode_mapping/vnode_placement.rs
+++ b/src/common/src/vnode_mapping/vnode_placement.rs
@@ -231,7 +231,7 @@ mod tests {
         let worker_1 = WorkerNode {
             id: 1,
             parallelism: 1,
-            property: Some(serving_property.clone()),
+            property: Some(serving_property),
             ..Default::default()
         };
 
@@ -246,7 +246,7 @@ mod tests {
         let worker_2 = WorkerNode {
             id: 2,
             parallelism: 50,
-            property: Some(serving_property.clone()),
+            property: Some(serving_property),
             ..Default::default()
         };
 
diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml
index a3f74792982f2..ed1758029092b 100644
--- a/src/compute/Cargo.toml
+++ b/src/compute/Cargo.toml
@@ -23,7 +23,8 @@ either = "1"
 foyer = { workspace = true }
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 futures-async-stream = { workspace = true }
-hyper = "0.14" # required by tonic
+http = "1"
+hyper = "1"
 itertools = { workspace = true }
 maplit = "1.0.2"
 pprof = { version = "0.13", features = ["flamegraph"] }
@@ -54,7 +55,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
     "signal",
     "fs",
 ] }
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 tonic = { workspace = true }
 tower = { version = "0.4", features = ["util", "load-shed"] }
 tracing = "0.1"
diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs
index a9a41d753ac96..0acc30e0c2430 100644
--- a/src/compute/src/rpc/service/monitor_service.rs
+++ b/src/compute/src/rpc/service/monitor_service.rs
@@ -389,8 +389,7 @@ pub mod grpc_middleware {
 
     use either::Either;
     use futures::Future;
-    use hyper::Body;
-    use tonic::transport::NamedService;
+    use tonic::body::BoxBody;
     use tower::{Layer, Service};
 
     /// Manages the await-trees of `gRPC` requests that are currently served by the compute node.
@@ -438,10 +437,9 @@ pub mod grpc_middleware {
         next_id: Arc<AtomicU64>,
     }
 
-    impl<S> Service<hyper::Request<Body>> for AwaitTreeMiddleware<S>
+    impl<S> Service<http::Request<BoxBody>> for AwaitTreeMiddleware<S>
     where
-        S: Service<hyper::Request<Body>> + Clone + Send + 'static,
-        S::Future: Send + 'static,
+        S: Service<http::Request<BoxBody>> + Clone,
     {
         type Error = S::Error;
         type Response = S::Response;
@@ -452,7 +450,7 @@ pub mod grpc_middleware {
             self.inner.poll_ready(cx)
         }
 
-        fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
+        fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
             let Some(registry) = self.registry.clone() else {
                 return Either::Left(self.inner.call(req));
             };
@@ -479,7 +477,8 @@ pub mod grpc_middleware {
         }
     }
 
-    impl<S: NamedService> NamedService for AwaitTreeMiddleware<S> {
+    #[cfg(not(madsim))]
+    impl<S: tonic::server::NamedService> tonic::server::NamedService for AwaitTreeMiddleware<S> {
         const NAME: &'static str = S::NAME;
     }
 }
diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml
index b46fc15bb605b..38e82ccdf76ea 100644
--- a/src/connector/Cargo.toml
+++ b/src/connector/Cargo.toml
@@ -61,10 +61,10 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
 futures-async-stream = { workspace = true }
 gcp-bigquery-client = "0.18.0"
 glob = "0.3"
-google-cloud-bigquery = { version = "0.9.0", features = ["auth"] }
-google-cloud-gax = "0.17.0"
-google-cloud-googleapis = { version = "0.13", features = ["pubsub", "bigquery"] }
-google-cloud-pubsub = "0.25"
+google-cloud-bigquery = { version = "0.12.0", features = ["auth"] }
+google-cloud-gax = "0.19.0"
+google-cloud-googleapis = { version = "0.15", features = ["pubsub", "bigquery"] }
+google-cloud-pubsub = "0.28"
 http = "0.2"
 iceberg = { workspace = true }
 iceberg-catalog-rest = { workspace = true }
@@ -100,8 +100,8 @@ pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal",
 postgres-openssl = "0.5.0"
 prometheus = { version = "0.13", features = ["process"] }
 prost = { workspace = true, features = ["no-recursion-limit"] }
-prost-reflect = "0.13"
-prost-types = "0.12"
+prost-reflect = "0.14"
+prost-types = "0.13"
 protobuf-native = "0.2.2"
 pulsar = { version = "6.3", default-features = false, features = [
     "tokio-runtime",
@@ -172,7 +172,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
 ] }
 tokio-postgres = { version = "0.7", features = ["with-uuid-1"] }
 tokio-retry = "0.3"
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 tokio-util = { workspace = true, features = ["codec", "io"] }
 tonic = { workspace = true }
 tracing = "0.1"
diff --git a/src/error/src/tonic.rs b/src/error/src/tonic.rs
index 4e3476c460fd6..f17b6b8ea9d44 100644
--- a/src/error/src/tonic.rs
+++ b/src/error/src/tonic.rs
@@ -244,7 +244,7 @@ mod tests {
         };
 
         let server_status = original.to_status(tonic::Code::Internal, "test");
-        let body = server_status.to_http();
+        let body = server_status.into_http();
         let client_status = tonic::Status::from_header_map(body.headers()).unwrap();
 
         let wrapper = TonicStatusWrapper::new(client_status);
diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml
index a9c955f91dcca..e493037c200b7 100644
--- a/src/expr/impl/Cargo.toml
+++ b/src/expr/impl/Cargo.toml
@@ -44,7 +44,7 @@ educe = "0.6"
 fancy-regex = "0.13"
 futures-async-stream = { workspace = true }
 futures-util = "0.3"
-ginepro = "0.7"
+ginepro = "0.7" # TODO(http-bump): bump to 0.8 once arrow-udf switches to tonic 0.12
 hex = "0.4"
 icelake = { workspace = true }
 itertools = { workspace = true }
@@ -71,7 +71,7 @@ sql-json-path = { version = "0.1", features = ["jsonbb"] }
 thiserror = "1"
 thiserror-ext = { workspace = true }
 tokio = { version = "0.2", package = "madsim-tokio", features = ["time"] }
-tonic = { version = "0.10", optional = true }
+tonic = { version = "0.10", optional = true } # TODO(http-bump): bump once arrow-udf switches to tonic 0.12
 tracing = "0.1"
 zstd = { version = "0.13", default-features = false, optional = true }
 
diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml
index 89d29e076a38f..3a95eab660b09 100644
--- a/src/frontend/Cargo.toml
+++ b/src/frontend/Cargo.toml
@@ -93,7 +93,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
     "signal",
     "fs",
 ] }
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 tonic = { workspace = true }
 tracing = "0.1"
 uuid = "1"
diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs
index 8e64a6db4e2b9..f453a89204501 100644
--- a/src/frontend/src/catalog/source_catalog.rs
+++ b/src/frontend/src/catalog/source_catalog.rs
@@ -114,12 +114,9 @@ impl From<&PbSource> for SourceCatalog {
         let owner = prost.owner;
         let watermark_descs = prost.get_watermark_descs().clone();
 
-        let associated_table_id = prost
-            .optional_associated_table_id
-            .clone()
-            .map(|id| match id {
-                OptionalAssociatedTableId::AssociatedTableId(id) => id,
-            });
+        let associated_table_id = prost.optional_associated_table_id.map(|id| match id {
+            OptionalAssociatedTableId::AssociatedTableId(id) => id,
+        });
         let version = prost.version;
 
         let connection_id = prost.connection_id;
diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs
index 5e32dcb7b2aba..432266d1871a9 100644
--- a/src/frontend/src/catalog/system_catalog/mod.rs
+++ b/src/frontend/src/catalog/system_catalog/mod.rs
@@ -240,7 +240,7 @@ fn get_acl_items(
 ) -> String {
     let mut res = String::from("{");
     let mut empty_flag = true;
-    let super_privilege = available_prost_privilege(object.clone(), for_dml_table);
+    let super_privilege = available_prost_privilege(*object, for_dml_table);
     for user in users {
         let privileges = if user.is_super {
             vec![&super_privilege]
diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs
index 80f6316f9ca90..b49abcb023429 100644
--- a/src/frontend/src/observer/observer_manager.rs
+++ b/src/frontend/src/observer/observer_manager.rs
@@ -472,8 +472,7 @@ impl FrontendObserverNode {
         match info {
             Info::HummockSnapshot(hummock_snapshot) => match resp.operation() {
                 Operation::Update => {
-                    self.hummock_snapshot_manager
-                        .update(hummock_snapshot.clone());
+                    self.hummock_snapshot_manager.update(*hummock_snapshot);
                 }
                 _ => panic!("receive an unsupported notify {:?}", resp),
             },
diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs
index a6c76c78bf23d..e30dfa0dad377 100644
--- a/src/frontend/src/scheduler/distributed/stage.rs
+++ b/src/frontend/src/scheduler/distributed/stage.rs
@@ -196,7 +196,7 @@ impl StageExecution {
         match cur_state {
             Pending { msg_sender } => {
                 let runner = StageRunner {
-                    epoch: self.epoch.clone(),
+                    epoch: self.epoch,
                     stage: self.stage.clone(),
                     worker_node_manager: self.worker_node_manager.clone(),
                     tasks: self.tasks.clone(),
@@ -649,7 +649,7 @@ impl StageRunner {
             &plan_node,
             &task_id,
             self.ctx.to_batch_task_context(),
-            self.epoch.clone(),
+            self.epoch,
             shutdown_rx.clone(),
         );
 
@@ -935,7 +935,7 @@ impl StageRunner {
         let t_id = task_id.task_id;
 
         let stream_status: Fuse<Streaming<TaskInfoResponse>> = compute_client
-            .create_task(task_id, plan_fragment, self.epoch.clone(), expr_context)
+            .create_task(task_id, plan_fragment, self.epoch, expr_context)
             .await
             .inspect_err(|_| self.mask_failed_serving_worker(&worker))
             .map_err(|e| anyhow!(e))?
diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs
index 9d13573e03a7b..73a3ade5799b5 100644
--- a/src/frontend/src/scheduler/snapshot.rs
+++ b/src/frontend/src/scheduler/snapshot.rs
@@ -121,7 +121,7 @@ impl PinnedSnapshot {
 
 impl Drop for PinnedSnapshot {
     fn drop(&mut self) {
-        let _ = self.unpin_sender.send(Operation::Unpin(self.value.clone()));
+        let _ = self.unpin_sender.send(Operation::Unpin(self.value));
     }
 }
 
@@ -202,9 +202,7 @@ impl HummockSnapshotManager {
                 false
             } else {
                 // First tell the worker that a new snapshot is going to be pinned.
-                self.worker_sender
-                    .send(Operation::Pin(snapshot.clone()))
-                    .unwrap();
+                self.worker_sender.send(Operation::Pin(snapshot)).unwrap();
                 // Then set the latest snapshot.
                 *old_snapshot = Arc::new(PinnedSnapshot {
                     value: snapshot,
diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml
index b35ce9e73d96b..4511e9f61d894 100644
--- a/src/meta/Cargo.toml
+++ b/src/meta/Cargo.toml
@@ -36,7 +36,7 @@ flate2 = "1"
 function_name = "0.3.0"
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 hex = "0.4"
-hyper = "0.14" # required by tonic
+http = "1"
 itertools = { workspace = true }
 jsonbb = { workspace = true }
 maplit = "1.0.2"
@@ -81,7 +81,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
   "signal",
 ] }
 tokio-retry = "0.3"
-tokio-stream = { version = "0.1", features = ["net"] }
+tokio-stream = { workspace = true }
 tonic = { workspace = true }
 tower = { version = "0.4", features = ["util", "load-shed"] }
 tracing = "0.1"
diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs
index 5c72b39bcd156..5d1b3570e1ce3 100644
--- a/src/meta/node/src/server.rs
+++ b/src/meta/node/src/server.rs
@@ -772,7 +772,7 @@ pub async fn start_service_as_election_leader(
         risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
     ]);
 
-    let server = tonic::transport::Server::builder()
+    let server_builder = tonic::transport::Server::builder()
         .layer(MetricsMiddlewareLayer::new(meta_metrics))
         .layer(TracingExtractLayer::new())
         .add_service(HeartbeatServiceServer::new(heartbeat_srv))
@@ -794,17 +794,19 @@ pub async fn start_service_as_election_leader(
         .add_service(ServingServiceServer::new(serving_srv))
         .add_service(CloudServiceServer::new(cloud_srv))
         .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
-        .add_service(EventLogServiceServer::new(event_log_srv))
-        .add_service(TraceServiceServer::new(trace_srv))
-        .monitored_serve_with_shutdown(
-            address_info.listen_addr,
-            "grpc-meta-leader-service",
-            TcpConfig {
-                tcp_nodelay: true,
-                keepalive_duration: None,
-            },
-            shutdown.clone().cancelled_owned(),
-        );
+        .add_service(EventLogServiceServer::new(event_log_srv));
+    #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic
+    let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
+
+    let server = server_builder.monitored_serve_with_shutdown(
+        address_info.listen_addr,
+        "grpc-meta-leader-service",
+        TcpConfig {
+            tcp_nodelay: true,
+            keepalive_duration: None,
+        },
+        shutdown.clone().cancelled_owned(),
+    );
     started::set();
     let _server_handle = tokio::spawn(server);
 
diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml
index 1e3330a2b53a0..69986f8570234 100644
--- a/src/meta/service/Cargo.toml
+++ b/src/meta/service/Cargo.toml
@@ -40,7 +40,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
     "time",
     "signal",
 ] }
-tokio-stream = { version = "0.1", features = ["net"] }
+tokio-stream = { workspace = true }
 tonic = { workspace = true }
 tracing = "0.1"
 
diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs
index 2594ccc123ff3..a3ee394e8f18f 100644
--- a/src/meta/service/src/ddl_service.rs
+++ b/src/meta/service/src/ddl_service.rs
@@ -907,7 +907,7 @@ impl DdlService for DdlServiceImpl {
         let req = request.into_inner();
 
         let table_id = req.get_table_id();
-        let parallelism = req.get_parallelism()?.clone();
+        let parallelism = *req.get_parallelism()?;
         let deferred = req.get_deferred();
 
         self.ddl_controller
diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs
index b8b829ab9cddb..48e8fcbbc05c1 100644
--- a/src/meta/src/barrier/mod.rs
+++ b/src/meta/src/barrier/mod.rs
@@ -688,7 +688,7 @@ impl GlobalBarrierManager {
                                             r#type: node.r#type,
                                             host: node.host.clone(),
                                             parallelism: node.parallelism,
-                                            property: node.property.clone(),
+                                            property: node.property,
                                             resource: node.resource.clone(),
                                             ..Default::default()
                                         },
diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs
index 18643bc8c43c1..6a7ca826f160a 100644
--- a/src/meta/src/controller/cluster.rs
+++ b/src/meta/src/controller/cluster.rs
@@ -935,7 +935,7 @@ mod tests {
                     .add_worker(
                         PbWorkerType::ComputeNode,
                         host.clone(),
-                        property.clone(),
+                        property,
                         PbResource::default(),
                     )
                     .await?,
@@ -967,7 +967,7 @@ mod tests {
         );
 
         // re-register existing worker node with larger parallelism and change its serving mode.
-        let mut new_property = property.clone();
+        let mut new_property = property;
         new_property.worker_node_parallelism = (parallelism_num * 2) as _;
         new_property.is_serving = false;
         cluster_ctl
@@ -1021,7 +1021,7 @@ mod tests {
             .add_worker(
                 PbWorkerType::ComputeNode,
                 host.clone(),
-                property.clone(),
+                property,
                 PbResource::default(),
             )
             .await?;
diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs
index 0f42c17751377..c7c2057fcc96e 100644
--- a/src/meta/src/hummock/manager/commit_epoch.rs
+++ b/src/meta/src/hummock/manager/commit_epoch.rs
@@ -377,7 +377,7 @@ impl HummockManager {
             committed_epoch: epoch,
             current_epoch: epoch,
         };
-        let prev_snapshot = self.latest_snapshot.swap(snapshot.clone().into());
+        let prev_snapshot = self.latest_snapshot.swap(snapshot.into());
         assert!(prev_snapshot.committed_epoch < epoch);
         assert!(prev_snapshot.current_epoch < epoch);
 
diff --git a/src/meta/src/hummock/model/pinned_snapshot.rs b/src/meta/src/hummock/model/pinned_snapshot.rs
index f485d9dab7211..fd009e22b789f 100644
--- a/src/meta/src/hummock/model/pinned_snapshot.rs
+++ b/src/meta/src/hummock/model/pinned_snapshot.rs
@@ -28,7 +28,7 @@ impl MetadataModel for HummockPinnedSnapshot {
     }
 
     fn to_protobuf(&self) -> Self::PbType {
-        self.clone()
+        *self
     }
 
     fn from_protobuf(prost: Self::PbType) -> Self {
diff --git a/src/meta/src/hummock/model/pinned_version.rs b/src/meta/src/hummock/model/pinned_version.rs
index e8f6b2e65e75e..b2e7d97501b2b 100644
--- a/src/meta/src/hummock/model/pinned_version.rs
+++ b/src/meta/src/hummock/model/pinned_version.rs
@@ -28,7 +28,7 @@ impl MetadataModel for HummockPinnedVersion {
     }
 
     fn to_protobuf(&self) -> Self::PbType {
-        self.clone()
+        *self
     }
 
     fn from_protobuf(prost: Self::PbType) -> Self {
diff --git a/src/meta/src/manager/catalog/user.rs b/src/meta/src/manager/catalog/user.rs
index f6e2f9e03e835..81181b0fc1e17 100644
--- a/src/meta/src/manager/catalog/user.rs
+++ b/src/meta/src/manager/catalog/user.rs
@@ -234,7 +234,7 @@ mod tests {
             .grant_privilege(
                 &[test_sub_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[Action::Select, Action::Update, Action::Delete],
                     true,
                 )],
@@ -249,7 +249,7 @@ mod tests {
             .grant_privilege(
                 &[test_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[Action::Select, Action::Insert],
                     false,
                 )],
@@ -258,7 +258,7 @@ mod tests {
             .await?;
         let user = catalog_manager.get_user(test_user_id).await?;
         assert_eq!(user.grant_privileges.len(), 1);
-        assert_eq!(user.grant_privileges[0].object, Some(object.clone()));
+        assert_eq!(user.grant_privileges[0].object, Some(object));
         assert_eq!(user.grant_privileges[0].action_with_opts.len(), 2);
         assert!(user.grant_privileges[0]
             .action_with_opts
@@ -269,7 +269,7 @@ mod tests {
             .grant_privilege(
                 &[test_sub_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[Action::Select, Action::Insert],
                     true,
                 )],
@@ -284,7 +284,7 @@ mod tests {
             .grant_privilege(
                 &[test_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[Action::Select, Action::Insert],
                     true,
                 )],
@@ -293,7 +293,7 @@ mod tests {
             .await?;
         let user = catalog_manager.get_user(test_user_id).await?;
         assert_eq!(user.grant_privileges.len(), 1);
-        assert_eq!(user.grant_privileges[0].object, Some(object.clone()));
+        assert_eq!(user.grant_privileges[0].object, Some(object));
         assert_eq!(user.grant_privileges[0].action_with_opts.len(), 2);
         assert!(user.grant_privileges[0]
             .action_with_opts
@@ -304,7 +304,7 @@ mod tests {
             .grant_privilege(
                 &[test_sub_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[Action::Select, Action::Insert],
                     true,
                 )],
@@ -319,7 +319,7 @@ mod tests {
             .grant_privilege(
                 &[test_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[Action::Select, Action::Update, Action::Delete],
                     true,
                 )],
@@ -328,7 +328,7 @@ mod tests {
             .await?;
         let user = catalog_manager.get_user(test_user_id).await?;
         assert_eq!(user.grant_privileges.len(), 1);
-        assert_eq!(user.grant_privileges[0].object, Some(object.clone()));
+        assert_eq!(user.grant_privileges[0].object, Some(object));
         assert_eq!(user.grant_privileges[0].action_with_opts.len(), 4);
         assert!(user.grant_privileges[0]
             .action_with_opts
@@ -339,7 +339,7 @@ mod tests {
         let res = catalog_manager
             .revoke_privilege(
                 &[test_user_id],
-                &[make_privilege(object.clone(), &[Action::Connect], false)],
+                &[make_privilege(object, &[Action::Connect], false)],
                 0,
                 test_sub_user_id,
                 true,
@@ -355,11 +355,7 @@ mod tests {
         let res = catalog_manager
             .revoke_privilege(
                 &[test_user_id],
-                &[make_privilege(
-                    other_object.clone(),
-                    &[Action::Connect],
-                    false,
-                )],
+                &[make_privilege(other_object, &[Action::Connect], false)],
                 0,
                 test_sub_user_id,
                 true,
@@ -376,7 +372,7 @@ mod tests {
             .revoke_privilege(
                 &[test_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[
                         Action::Select,
                         Action::Insert,
@@ -401,7 +397,7 @@ mod tests {
             .revoke_privilege(
                 &[test_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[
                         Action::Select,
                         Action::Insert,
@@ -429,7 +425,7 @@ mod tests {
             .revoke_privilege(
                 &[test_user_id],
                 &[make_privilege(
-                    object.clone(),
+                    object,
                     &[Action::Select, Action::Insert, Action::Delete],
                     false,
                 )],
diff --git a/src/meta/src/rpc/intercept.rs b/src/meta/src/rpc/intercept.rs
index 8b5bb67f30943..87151e06b88a1 100644
--- a/src/meta/src/rpc/intercept.rs
+++ b/src/meta/src/rpc/intercept.rs
@@ -16,7 +16,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use futures::Future;
-use hyper::Body;
+use tonic::body::BoxBody;
 use tower::{Layer, Service};
 
 use crate::rpc::metrics::MetaMetrics;
@@ -49,9 +49,9 @@ pub struct MetricsMiddleware<S> {
     metrics: Arc<MetaMetrics>,
 }
 
-impl<S> Service<hyper::Request<Body>> for MetricsMiddleware<S>
+impl<S> Service<http::Request<BoxBody>> for MetricsMiddleware<S>
 where
-    S: Service<hyper::Request<Body>> + Clone + Send + 'static,
+    S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
     S::Future: Send + 'static,
 {
     type Error = S::Error;
@@ -63,7 +63,7 @@ where
         self.inner.poll_ready(cx)
     }
 
-    fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
+    fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
         // This is necessary because tonic internally uses `tower::buffer::Buffer`.
         // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
         // for details on why this is necessary
diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml
index e821c2fc85090..fa433a30abcb4 100644
--- a/src/object_store/Cargo.toml
+++ b/src/object_store/Cargo.toml
@@ -26,7 +26,7 @@ crc32fast = "1"
 either = "1"
 fail = "0.5"
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
-hyper = { version = "0.14", features = ["tcp", "client"] }                    # required by aws sdk
+hyper = { version = "0.14", features = ["tcp", "client"] }                    # TODO(http-bump): required by aws sdk
 hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] }
 hyper-tls = "0.5.0"
 itertools = { workspace = true }
diff --git a/src/prost/build.rs b/src/prost/build.rs
index bc4c6c413260f..e9974d6d87a09 100644
--- a/src/prost/build.rs
+++ b/src/prost/build.rs
@@ -131,7 +131,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
         )
         .type_attribute("plan_common.GeneratedColumnDesc", "#[derive(Eq, Hash)]")
         .type_attribute("plan_common.DefaultColumnDesc", "#[derive(Eq, Hash)]")
-        .type_attribute("plan_common.Cardinality", "#[derive(Eq, Hash, Copy)]")
+        .type_attribute("plan_common.Cardinality", "#[derive(Eq, Hash)]")
         .type_attribute("plan_common.ExternalTableDesc", "#[derive(Eq, Hash)]")
         .type_attribute("plan_common.ColumnDesc", "#[derive(Eq, Hash)]")
         .type_attribute("plan_common.AdditionalColumn", "#[derive(Eq, Hash)]")
diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs
index 8ca7656a281fa..b8cab9006dea4 100644
--- a/src/prost/src/lib.rs
+++ b/src/prost/src/lib.rs
@@ -19,6 +19,7 @@
 
 use std::str::FromStr;
 
+pub use prost::Message;
 use risingwave_error::tonic::ToTonicStatus;
 use thiserror::Error;
 
diff --git a/src/risedevtool/Cargo.toml b/src/risedevtool/Cargo.toml
index b8a2ca9db14f9..71c2662024dbb 100644
--- a/src/risedevtool/Cargo.toml
+++ b/src/risedevtool/Cargo.toml
@@ -23,7 +23,7 @@ clap = { workspace = true }
 console = "0.15"
 fs-err = "2.11.0"
 glob = "0.3"
-google-cloud-pubsub = "0.25"
+google-cloud-pubsub = "0.28"
 indicatif = "0.17"
 itertools = { workspace = true }
 rdkafka = { workspace = true }
diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml
index 37064df273ed0..49729c6d9e8ac 100644
--- a/src/rpc_client/Cargo.toml
+++ b/src/rpc_client/Cargo.toml
@@ -19,8 +19,8 @@ async-trait = "0.1"
 easy-ext = "1"
 either = "1.13.0"
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
-http = "0.2"
-hyper = "0.14" # required by tonic
+http = "1"
+hyper = "1"
 itertools = { workspace = true }
 lru = { workspace = true }
 moka = { version = "0.12", features = ["future"] }
@@ -43,7 +43,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
     "signal",
 ] }
 tokio-retry = "0.3"
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 tonic = { workspace = true }
 tower = "0.4"
 tracing = "0.1"
diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs
index 748e65d1f3af5..5a45e0752c9d8 100644
--- a/src/rpc_client/src/meta_client.rs
+++ b/src/rpc_client/src/meta_client.rs
@@ -245,7 +245,7 @@ impl MetaClient {
                 .add_worker_node(AddWorkerNodeRequest {
                     worker_type: worker_type as i32,
                     host: Some(addr.to_protobuf()),
-                    property: Some(property.clone()),
+                    property: Some(property),
                     resource: Some(risingwave_pb::common::worker_node::Resource {
                         rw_version: RW_VERSION.to_string(),
                         total_memory_bytes: system_memory_available_bytes() as _,
diff --git a/src/rpc_client/src/tracing.rs b/src/rpc_client/src/tracing.rs
index 50c98007bb9fd..aab07d43225d4 100644
--- a/src/rpc_client/src/tracing.rs
+++ b/src/rpc_client/src/tracing.rs
@@ -16,46 +16,22 @@ use std::task::{Context, Poll};
 
 use futures::Future;
 use risingwave_common::util::tracing::TracingContext;
-use tower::{Layer, Service};
-
-/// A layer that decorates the inner service with [`TracingInject`].
-#[derive(Clone, Default)]
-pub struct TracingInjectLayer {
-    _private: (),
-}
-
-impl TracingInjectLayer {
-    #[allow(dead_code)]
-    pub fn new() -> Self {
-        Self::default()
-    }
-}
-
-impl<S> Layer<S> for TracingInjectLayer {
-    type Service = TracingInject<S>;
-
-    fn layer(&self, service: S) -> Self::Service {
-        TracingInject { inner: service }
-    }
-}
+use tonic::body::BoxBody;
+use tower::Service;
 
 /// A service wrapper that injects the [`TracingContext`] obtained from the current tracing span
 /// into the HTTP headers of the request.
 ///
 /// See also `TracingExtract` in the `common_service` crate.
 #[derive(Clone, Debug)]
-pub struct TracingInject<S> {
-    inner: S,
+pub struct TracingInjectChannel {
+    inner: tonic::transport::Channel,
 }
 
-impl<S, B> Service<hyper::Request<B>> for TracingInject<S>
-where
-    S: Service<hyper::Request<B>> + Clone + Send + 'static,
-    S::Future: Send + 'static,
-    B: hyper::body::HttpBody, // tonic `Channel` uses `BoxBody` instead of `hyper::Body`
-{
-    type Error = S::Error;
-    type Response = S::Response;
+#[cfg(not(madsim))]
+impl Service<http::Request<BoxBody>> for TracingInjectChannel {
+    type Error = tonic::transport::Error;
+    type Response = http::Response<BoxBody>;
 
     type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
 
@@ -63,7 +39,7 @@ where
         self.inner.poll_ready(cx)
     }
 
-    fn call(&mut self, mut req: hyper::Request<B>) -> Self::Future {
+    fn call(&mut self, mut req: http::Request<BoxBody>) -> Self::Future {
         // This is necessary because tonic internally uses `tower::buffer::Buffer`.
         // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
         // for details on why this is necessary
@@ -81,21 +57,21 @@ where
 /// A wrapper around tonic's `Channel` that injects the [`TracingContext`] obtained from the current
 /// tracing span when making gRPC requests.
 #[cfg(not(madsim))]
-pub type Channel = TracingInject<tonic::transport::Channel>;
+pub type Channel = TracingInjectChannel;
 #[cfg(madsim)]
 pub type Channel = tonic::transport::Channel;
 
-/// An extension trait for tonic's `Channel` that wraps it in a [`TracingInject`] service.
+/// An extension trait for tonic's `Channel` that wraps it into a [`TracingInjectChannel`].
 #[easy_ext::ext(TracingInjectedChannelExt)]
 impl tonic::transport::Channel {
-    /// Wraps the channel in a [`TracingInject`] service, so that the [`TracingContext`] obtained
+    /// Wraps the channel into a [`TracingInjectChannel`], so that the [`TracingContext`] obtained
     /// from the current tracing span is injected into the HTTP headers of the request.
     ///
     /// The server can then extract the [`TracingContext`] from the HTTP headers with the
     /// `TracingExtract` middleware.
     pub fn tracing_injected(self) -> Channel {
         #[cfg(not(madsim))]
-        return TracingInject { inner: self };
+        return TracingInjectChannel { inner: self };
         #[cfg(madsim)]
         return self;
     }
diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs
index 4c53af38eef67..8dfba1b62a181 100644
--- a/src/storage/backup/src/lib.rs
+++ b/src/storage/backup/src/lib.rs
@@ -122,7 +122,7 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata {
             state_table_info: m
                 .state_table_info
                 .iter()
-                .map(|(t, i)| (t.table_id, i.clone()))
+                .map(|(t, i)| (t.table_id, *i))
                 .collect(),
             rw_version: m.rw_version.clone(),
         }
diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
index d194f4355aa2b..1ee4fe0443783 100644
--- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
+++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
@@ -85,7 +85,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary
             }
             GroupDelta::GroupDestroy(destroy_delta) => {
                 assert!(group_destroy.is_none());
-                group_destroy = Some(destroy_delta.clone());
+                group_destroy = Some(*destroy_delta);
             }
             GroupDelta::GroupMetaChange(meta_delta) => {
                 group_meta_changes.push(meta_delta.clone());
diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs
index 1fbd26ed3485b..5894ed3e4a6e9 100644
--- a/src/storage/hummock_sdk/src/time_travel.rs
+++ b/src/storage/hummock_sdk/src/time_travel.rs
@@ -338,7 +338,7 @@ impl IncompleteHummockVersionDelta {
             state_table_info_delta: self
                 .state_table_info_delta
                 .iter()
-                .map(|(table_id, delta)| (table_id.table_id, delta.clone()))
+                .map(|(table_id, delta)| (table_id.table_id, *delta))
                 .collect(),
         }
     }
diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs
index 4ce5b89138f8e..e418250f0b6bf 100644
--- a/src/storage/hummock_sdk/src/version.rs
+++ b/src/storage/hummock_sdk/src/version.rs
@@ -77,7 +77,7 @@ impl HummockVersionStateTableInfo {
     pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
         let state_table_info = state_table_info
             .iter()
-            .map(|(table_id, info)| (TableId::new(*table_id), info.clone()))
+            .map(|(table_id, info)| (TableId::new(*table_id), *info))
             .collect();
         let compaction_group_member_tables =
             Self::build_compaction_group_member_tables(&state_table_info);
@@ -90,7 +90,7 @@ impl HummockVersionStateTableInfo {
     pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
         self.state_table_info
             .iter()
-            .map(|(table_id, info)| (table_id.table_id, info.clone()))
+            .map(|(table_id, info)| (table_id.table_id, *info))
             .collect()
     }
 
@@ -642,7 +642,7 @@ impl From<&PbHummockVersionDelta> for HummockVersionDelta {
             state_table_info_delta: pb_version_delta
                 .state_table_info_delta
                 .iter()
-                .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone()))
+                .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
                 .collect(),
         }
     }
@@ -679,7 +679,7 @@ impl From<&HummockVersionDelta> for PbHummockVersionDelta {
             state_table_info_delta: version_delta
                 .state_table_info_delta
                 .iter()
-                .map(|(table_id, delta)| (table_id.table_id, delta.clone()))
+                .map(|(table_id, delta)| (table_id.table_id, *delta))
                 .collect(),
         }
     }
@@ -716,7 +716,7 @@ impl From<HummockVersionDelta> for PbHummockVersionDelta {
             state_table_info_delta: version_delta
                 .state_table_info_delta
                 .into_iter()
-                .map(|(table_id, delta)| (table_id.table_id, delta.clone()))
+                .map(|(table_id, delta)| (table_id.table_id, delta))
                 .collect(),
         }
     }
@@ -761,7 +761,7 @@ impl From<PbHummockVersionDelta> for HummockVersionDelta {
             state_table_info_delta: pb_version_delta
                 .state_table_info_delta
                 .iter()
-                .map(|(table_id, delta)| (TableId::new(*table_id), delta.clone()))
+                .map(|(table_id, delta)| (TableId::new(*table_id), *delta))
                 .collect(),
         }
     }
@@ -938,7 +938,7 @@ impl From<&GroupDelta> for PbGroupDelta {
                 delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct.clone())),
             },
             GroupDelta::GroupDestroy(pb_group_destroy) => PbGroupDelta {
-                delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy.clone())),
+                delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)),
             },
             GroupDelta::GroupMetaChange(pb_group_meta_change) => PbGroupDelta {
                 delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change.clone())),
@@ -960,7 +960,7 @@ impl From<&PbGroupDelta> for GroupDelta {
                 GroupDelta::GroupConstruct(pb_group_construct.clone())
             }
             Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => {
-                GroupDelta::GroupDestroy(pb_group_destroy.clone())
+                GroupDelta::GroupDestroy(*pb_group_destroy)
             }
             Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => {
                 GroupDelta::GroupMetaChange(pb_group_meta_change.clone())
diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml
index 3c85092a4d677..de25cf8439be1 100644
--- a/src/stream/Cargo.toml
+++ b/src/stream/Cargo.toml
@@ -79,7 +79,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
 ] }
 tokio-metrics = "0.3.0"
 tokio-retry = "0.3"
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 tonic = { workspace = true }
 tracing = "0.1"
 
diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml
index 6d881f203f670..143d0f0c01c75 100644
--- a/src/tests/simulation/Cargo.toml
+++ b/src/tests/simulation/Cargo.toml
@@ -55,7 +55,7 @@ tempfile = "3"
 tikv-jemallocator = { workspace = true }
 tokio = { version = "0.2", package = "madsim-tokio" }
 tokio-postgres = "0.7"
-tokio-stream = "0.1"
+tokio-stream = { workspace = true }
 tracing = "0.1"
 tracing-subscriber = { version = "0.3", features = ["env-filter"] }
 
diff --git a/src/tests/state_cleaning_test/Cargo.toml b/src/tests/state_cleaning_test/Cargo.toml
index a105360a68f6b..6c12898343951 100644
--- a/src/tests/state_cleaning_test/Cargo.toml
+++ b/src/tests/state_cleaning_test/Cargo.toml
@@ -24,7 +24,7 @@ serde = { version = "1", features = ["derive"] }
 serde_with = "3"
 tokio = { version = "0.2", package = "madsim-tokio" }
 tokio-postgres = "0.7"
-tokio-stream = { version = "0.1", features = ["fs"] }
+tokio-stream = { workspace = true }
 toml = "0.8"
 tracing = "0.1"
 
diff --git a/src/utils/runtime/Cargo.toml b/src/utils/runtime/Cargo.toml
index 0815a005df5b5..ff2902e7a4b4c 100644
--- a/src/utils/runtime/Cargo.toml
+++ b/src/utils/runtime/Cargo.toml
@@ -17,7 +17,7 @@ normal = ["workspace-hack"]
 [dependencies]
 await-tree = { workspace = true }
 console = "0.15"
-console-subscriber = "0.3.0"
+console-subscriber = "0.4"
 either = "1"
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 hostname = "0.4"