Skip to content

Commit 766313c

Browse files
committed
retry with grpc errors
1 parent df65235 commit 766313c

File tree

3 files changed

+65
-38
lines changed

3 files changed

+65
-38
lines changed

src/meta/service/src/ddl_service.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -966,15 +966,16 @@ impl DdlService for DdlServiceImpl {
966966
);
967967
for table in tables {
968968
// send a request to the frontend to get the ReplaceTablePlan
969+
// will retry with exponential backoff if the request fails
969970
let resp = client
970971
.get_table_replace_plan(GetTableReplacePlanRequest {
971972
database_id: table.database_id,
972973
owner: table.owner,
973974
table_name: table.name,
974975
table_change: Some(table_change.clone()),
975976
})
976-
.await
977-
.map_err(MetaError::from)?;
977+
.await?
978+
.into_inner();
978979

979980
if let Some(plan) = resp.replace_plan {
980981
plan.table

src/rpc_client/src/frontend_client.rs

+61-18
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,20 @@ use risingwave_common::monitor::{EndpointExt, TcpConfig};
2121
use risingwave_common::util::addr::HostAddr;
2222
use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
2323
use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse};
24+
use tokio_retry::strategy::{jitter, ExponentialBackoff};
2425
use tonic::transport::Endpoint;
26+
use tonic::Response;
2527

2628
use crate::error::Result;
2729
use crate::tracing::{Channel, TracingInjectedChannelExt};
28-
use crate::{frontend_rpc_client_method_impl, RpcClient, RpcClientPool};
30+
use crate::{RpcClient, RpcClientPool};
2931

30-
#[derive(Clone)]
31-
pub struct FrontendClient(FrontendServiceClient<Channel>);
32+
const DEFAULT_RETRY_INTERVAL: u64 = 50;
33+
const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
34+
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
3235

33-
#[async_trait]
34-
impl RpcClient for FrontendClient {
35-
async fn new_client(host_addr: HostAddr) -> Result<Self> {
36-
Self::new(host_addr).await
37-
}
38-
}
36+
#[derive(Clone)]
37+
struct FrontendClient(FrontendServiceClient<Channel>);
3938

4039
impl FrontendClient {
4140
async fn new(host_addr: HostAddr) -> Result<Self> {
@@ -59,17 +58,61 @@ impl FrontendClient {
5958
}
6059

6160
// similar to the stream_client used in the Meta node
62-
pub type FrontendClientPool = RpcClientPool<FrontendClient>;
61+
pub type FrontendClientPool = RpcClientPool<FrontendRetryClient>;
6362
pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
6463

65-
macro_rules! for_all_frontend_rpc {
66-
($macro:ident) => {
67-
$macro! {
68-
{ 0, get_table_replace_plan, GetTableReplacePlanRequest, GetTableReplacePlanResponse }
69-
}
70-
};
64+
#[async_trait]
65+
impl RpcClient for FrontendRetryClient {
66+
async fn new_client(host_addr: HostAddr) -> Result<Self> {
67+
Self::new(host_addr).await
68+
}
7169
}
7270

73-
impl FrontendClient {
74-
for_all_frontend_rpc! { frontend_rpc_client_method_impl }
71+
#[derive(Clone)]
72+
pub struct FrontendRetryClient {
73+
client: FrontendClient,
74+
}
75+
76+
impl FrontendRetryClient {
77+
async fn new(host_addr: HostAddr) -> Result<Self> {
78+
let client = FrontendClient::new(host_addr).await?;
79+
Ok(Self { client })
80+
}
81+
82+
#[inline(always)]
83+
fn get_retry_strategy() -> impl Iterator<Item = Duration> {
84+
ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
85+
.max_delay(DEFAULT_RETRY_MAX_DELAY)
86+
.take(DEFAULT_RETRY_MAX_ATTEMPTS)
87+
.map(jitter)
88+
}
89+
90+
fn should_retry(status: &tonic::Status) -> bool {
91+
if status.code() == tonic::Code::Unavailable
92+
|| status.code() == tonic::Code::Unknown
93+
|| status.code() == tonic::Code::Unauthenticated
94+
|| status.code() == tonic::Code::Aborted
95+
{
96+
return true;
97+
}
98+
false
99+
}
100+
101+
pub async fn get_table_replace_plan(
102+
&self,
103+
request: GetTableReplacePlanRequest,
104+
) -> std::result::Result<Response<GetTableReplacePlanResponse>, tonic::Status> {
105+
tokio_retry::RetryIf::spawn(
106+
Self::get_retry_strategy(),
107+
|| async {
108+
self.client
109+
.to_owned()
110+
.0
111+
.get_table_replace_plan(request.clone())
112+
.await
113+
},
114+
Self::should_retry,
115+
)
116+
.await
117+
}
75118
}

src/rpc_client/src/lib.rs

+1-18
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ mod tracing;
6666
pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
6767
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
6868
pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
69-
pub use frontend_client::{FrontendClient, FrontendClientPool, FrontendClientPoolRef};
69+
pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
7070
pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient};
7171
pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
7272
use rw_futures_util::await_future_with_monitor_error_stream;
@@ -210,23 +210,6 @@ macro_rules! meta_rpc_client_method_impl {
210210
}
211211
}
212212

213-
#[macro_export]
214-
macro_rules! frontend_rpc_client_method_impl {
215-
($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
216-
$(
217-
pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
218-
Ok(self
219-
.$client
220-
.to_owned()
221-
.$fn_name(request)
222-
.await
223-
.map_err($crate::error::RpcError::from_frontend_status)?
224-
.into_inner())
225-
}
226-
)*
227-
}
228-
}
229-
230213
pub const DEFAULT_BUFFER_SIZE: usize = 16;
231214

232215
pub struct BidiStreamSender<REQ> {

0 commit comments

Comments
 (0)