Skip to content

Commit 756d840

Browse files
committed
Revert "chore: bump tonic to v0.12 (#17889)"
This reverts commit e96c39d.
1 parent df80dd6 commit 756d840

File tree

59 files changed

+329
-513
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+329
-513
lines changed

Cargo.lock

+122-238
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+7-11
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,19 @@ aws-smithy-types = { version = "1", default-features = false, features = [
121121
aws-endpoint = "0.60"
122122
aws-types = "1"
123123
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
124-
etcd-client = { package = "madsim-etcd-client", version = "0.6" }
124+
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
125125
futures-async-stream = "0.2.9"
126126
hytra = "0.1"
127127
rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
128128
"cmake-build",
129129
] }
130130
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
131131
criterion = { version = "0.5", features = ["async_futures"] }
132-
tonic = { package = "madsim-tonic", version = "0.5.1" }
133-
tonic-build = { package = "madsim-tonic-build", version = "0.5" }
134-
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" }
135-
prost = { version = "0.13" }
136-
prost-build = { version = "0.13" }
132+
tonic = { package = "madsim-tonic", version = "0.4.1" }
133+
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
134+
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "492c244e0be91feb659c0cd48a624bbd96045a33" }
135+
prost = { version = "0.12" }
136+
prost-build = { version = "0.12" }
137137
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [
138138
"prometheus",
139139
] }
@@ -180,7 +180,6 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git"
180180
"profiling",
181181
"stats",
182182
], rev = "64a2d9" }
183-
# TODO(http-bump): bump to use tonic 0.12 once minitrace-opentelemetry is updated
184183
opentelemetry = "0.23"
185184
opentelemetry-otlp = "0.16"
186185
opentelemetry_sdk = { version = "0.23", default-features = false }
@@ -196,7 +195,6 @@ sea-orm = { version = "0.12.14", features = [
196195
"runtime-tokio-native-tls",
197196
] }
198197
sqlx = "0.7"
199-
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = ["net", "fs"] }
200198
tokio-util = "0.7"
201199
tracing-opentelemetry = "0.24"
202200
rand = { version = "0.8", features = ["small_rng"] }
@@ -337,9 +335,7 @@ opt-level = 2
337335
# Patch third-party crates for deterministic simulation.
338336
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
339337
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
340-
# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
341-
# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
342-
# tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055" }
338+
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" }
343339
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
344340
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" }
345341
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }

ci/docker-compose.yml

+1
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ services:
266266
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
267267
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082
268268
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092
269+
SCHEMA_REGISTRY_DEBUG: 'true'
269270

270271
pulsar-server:
271272
container_name: pulsar-server

e2e_test/sink/kafka/protobuf.slt

+11-5
Original file line numberDiff line numberDiff line change
@@ -201,19 +201,25 @@ format plain encode protobuf (
201201
message = 'recursive.AllTypes');
202202

203203
statement ok
204-
drop table from_kafka cascade;
204+
drop sink sink_upsert;
205205

206206
statement ok
207-
drop table from_kafka_csr_trivial cascade;
207+
drop sink sink_csr_nested;
208208

209209
statement ok
210-
drop table from_kafka_csr_nested cascade;
210+
drop sink sink_csr_trivial;
211211

212212
statement ok
213-
drop table from_kafka_raw cascade;
213+
drop sink sink0;
214214

215215
statement ok
216-
drop table into_kafka cascade;
216+
drop table into_kafka;
217+
218+
statement ok
219+
drop table from_kafka_raw;
217220

218221
system ok
219222
rpk topic delete test-rw-sink-upsert-protobuf
223+
224+
statement ok
225+
drop table from_kafka;

src/batch/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
6363
"fs",
6464
] }
6565
tokio-metrics = "0.3.0"
66-
tokio-stream = { workspace = true }
66+
tokio-stream = "0.1"
6767
tokio-util = { workspace = true }
6868
tonic = { workspace = true }
6969
tracing = "0.1"

src/batch/src/executor/hash_agg.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use bytes::Bytes;
2020
use futures_async_stream::try_stream;
2121
use hashbrown::hash_map::Entry;
2222
use itertools::Itertools;
23+
use prost::Message;
2324
use risingwave_common::array::{DataChunk, StreamChunk};
2425
use risingwave_common::bitmap::Bitmap;
2526
use risingwave_common::catalog::{Field, Schema};
@@ -34,7 +35,6 @@ use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction
3435
use risingwave_pb::batch_plan::plan_node::NodeBody;
3536
use risingwave_pb::batch_plan::HashAggNode;
3637
use risingwave_pb::data::DataChunk as PbDataChunk;
37-
use risingwave_pb::Message;
3838

3939
use crate::error::{BatchError, Result};
4040
use crate::executor::aggregation::build as build_agg;

src/batch/src/executor/join/distributed_lookup_join.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,10 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
354354
let pk_prefix = OwnedRow::new(scan_range.eq_conds);
355355

356356
if self.lookup_prefix_len == self.table.pk_indices().len() {
357-
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;
357+
let row = self
358+
.table
359+
.get_row(&pk_prefix, self.epoch.clone().into())
360+
.await?;
358361

359362
if let Some(row) = row {
360363
self.row_list.push(row);
@@ -363,7 +366,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
363366
let iter = self
364367
.table
365368
.batch_iter_with_pk_bounds(
366-
self.epoch.into(),
369+
self.epoch.clone().into(),
367370
&pk_prefix,
368371
..,
369372
false,

src/batch/src/executor/join/hash_join.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020
use bytes::Bytes;
2121
use futures_async_stream::try_stream;
2222
use itertools::Itertools;
23+
use prost::Message;
2324
use risingwave_common::array::{Array, DataChunk, RowRef};
2425
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
2526
use risingwave_common::catalog::Schema;
@@ -33,7 +34,6 @@ use risingwave_common_estimate_size::EstimateSize;
3334
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
3435
use risingwave_pb::batch_plan::plan_node::NodeBody;
3536
use risingwave_pb::data::DataChunk as PbDataChunk;
36-
use risingwave_pb::Message;
3737

3838
use super::{ChunkedData, JoinType, RowId};
3939
use crate::error::{BatchError, Result};

src/batch/src/executor/join/local_lookup_join.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
134134
..Default::default()
135135
}),
136136
}),
137-
epoch: Some(self.epoch),
137+
epoch: Some(self.epoch.clone()),
138138
tracing_context: TracingContext::from_current_span().to_protobuf(),
139139
};
140140

@@ -237,7 +237,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
237237
&plan_node,
238238
&task_id,
239239
self.context.clone(),
240-
self.epoch,
240+
self.epoch.clone(),
241241
self.shutdown_rx.clone(),
242242
);
243243

src/batch/src/executor/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
174174
plan_node,
175175
self.task_id,
176176
self.context.clone(),
177-
self.epoch,
177+
self.epoch.clone(),
178178
self.shutdown_rx.clone(),
179179
)
180180
}
@@ -188,7 +188,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
188188
}
189189

190190
pub fn epoch(&self) -> BatchQueryEpoch {
191-
self.epoch
191+
self.epoch.clone()
192192
}
193193
}
194194

src/batch/src/executor/order_by.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use bytes::Bytes;
1818
use futures_async_stream::try_stream;
1919
use itertools::Itertools;
20+
use prost::Message;
2021
use risingwave_common::array::DataChunk;
2122
use risingwave_common::catalog::Schema;
2223
use risingwave_common::memory::MemoryContext;
@@ -27,7 +28,6 @@ use risingwave_common::util::sort_util::ColumnOrder;
2728
use risingwave_common_estimate_size::EstimateSize;
2829
use risingwave_pb::batch_plan::plan_node::NodeBody;
2930
use risingwave_pb::data::DataChunk as PbDataChunk;
30-
use risingwave_pb::Message;
3131

3232
use super::{
3333
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,

src/batch/src/executor/row_seq_scan.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
237237

238238
let ordered = seq_scan_node.ordered;
239239

240-
let epoch = source.epoch;
240+
let epoch = source.epoch.clone();
241241
let limit = seq_scan_node.limit;
242242
let as_of = seq_scan_node
243243
.as_of
@@ -341,7 +341,8 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
341341
for point_get in point_gets {
342342
let table = table.clone();
343343
if let Some(row) =
344-
Self::execute_point_get(table, point_get, query_epoch, histogram.clone()).await?
344+
Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone())
345+
.await?
345346
{
346347
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
347348
returned += chunk.cardinality() as u64;
@@ -372,7 +373,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
372373
table.clone(),
373374
range,
374375
ordered,
375-
query_epoch,
376+
query_epoch.clone(),
376377
chunk_size,
377378
limit,
378379
histogram.clone(),

src/batch/src/spill/spill_op.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ use futures_util::AsyncReadExt;
2222
use opendal::layers::RetryLayer;
2323
use opendal::services::{Fs, Memory};
2424
use opendal::Operator;
25+
use prost::Message;
2526
use risingwave_common::array::DataChunk;
2627
use risingwave_pb::data::DataChunk as PbDataChunk;
27-
use risingwave_pb::Message;
2828
use thiserror_ext::AsReport;
2929
use tokio::sync::Mutex;
3030
use twox_hash::XxHash64;

src/batch/src/task/broadcast_channel.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub fn new_broadcast_channel(
8686
output_channel_size: usize,
8787
) -> (ChanSenderImpl, Vec<ChanReceiverImpl>) {
8888
let broadcast_info = match shuffle.distribution {
89-
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => *v,
89+
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => v.clone(),
9090
_ => BroadcastInfo::default(),
9191
};
9292

src/batch/src/task/task_execution.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
393393
self.plan.root.as_ref().unwrap(),
394394
&self.task_id,
395395
self.context.clone(),
396-
self.epoch,
396+
self.epoch.clone(),
397397
self.shutdown_rx.clone(),
398398
)
399399
.build(),

src/bench/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
5050
"time",
5151
"signal",
5252
] }
53-
tokio-stream = { workspace = true }
53+
tokio-stream = "0.1"
5454
toml = "0.8"
5555
tracing = "0.1"
5656
tracing-subscriber = "0.3.17"

src/common/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
5555
governor = { version = "0.6", default-features = false, features = ["std"] }
5656
hashbrown = "0.14"
5757
hex = "0.4.3"
58-
http = "1"
58+
http = "0.2"
5959
humantime = "2.1"
6060
hytra = { workspace = true }
6161
itertools = { workspace = true }

src/common/common_service/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ normal = ["workspace-hack"]
1818
async-trait = "0.1"
1919
axum = { workspace = true }
2020
futures = { version = "0.3", default-features = false, features = ["alloc"] }
21-
http = "1"
21+
hyper = "0.14" # required by tonic
2222
prometheus = { version = "0.13" }
2323
risingwave_common = { workspace = true }
2424
risingwave_pb = { workspace = true }

src/common/common_service/src/tracing.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
use std::task::{Context, Poll};
1616

1717
use futures::Future;
18+
use hyper::Body;
1819
use risingwave_common::util::tracing::TracingContext;
19-
use tonic::body::BoxBody;
2020
use tower::{Layer, Service};
2121
use tracing::Instrument;
2222

@@ -49,9 +49,9 @@ pub struct TracingExtract<S> {
4949
inner: S,
5050
}
5151

52-
impl<S> Service<http::Request<BoxBody>> for TracingExtract<S>
52+
impl<S> Service<hyper::Request<Body>> for TracingExtract<S>
5353
where
54-
S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
54+
S: Service<hyper::Request<Body>> + Clone + Send + 'static,
5555
S::Future: Send + 'static,
5656
{
5757
type Error = S::Error;
@@ -63,7 +63,7 @@ where
6363
self.inner.poll_ready(cx)
6464
}
6565

66-
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
66+
fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
6767
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
6868
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
6969
// for details on why this is necessary

src/common/metrics/Cargo.toml

+5-9
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,12 @@ ignored = ["workspace-hack"]
1515
normal = ["workspace-hack"]
1616

1717
[dependencies]
18-
auto_impl = "1"
1918
bytes = "1"
2019
clap = { workspace = true }
2120
easy-ext = "1"
2221
futures = { version = "0.3", default-features = false, features = ["alloc"] }
23-
http = "1"
24-
http-02 = { package = "http", version = "0.2" }
25-
hyper = { version = "1" }
26-
hyper-014 = { package = "hyper", version = "0.14" }
27-
hyper-util = { version = "0.1", features = ["client-legacy"] }
22+
http = "0.2"
23+
hyper = { version = "0.14", features = ["client"] } # used by tonic
2824
hytra = { workspace = true }
2925
itertools = { workspace = true }
3026
parking_lot = { workspace = true }
@@ -36,13 +32,13 @@ serde = { version = "1", features = ["derive"] }
3632
thiserror-ext = { workspace = true }
3733
tokio = { version = "0.2", package = "madsim-tokio" }
3834
tonic = { workspace = true }
39-
tower-layer = "0.3.2"
40-
tower-service = "0.3.2"
4135
tracing = "0.1"
4236
tracing-subscriber = "0.3.17"
4337

4438
[target.'cfg(not(madsim))'.dependencies]
45-
http-body = "1"
39+
http-body = "0.4.5"
40+
tower-layer = "0.3.2"
41+
tower-service = "0.3.2"
4642
[target.'cfg(target_os = "linux")'.dependencies]
4743
procfs = { version = "0.16", default-features = false }
4844
libc = "0.2"

0 commit comments

Comments
 (0)