Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rpc): prefer monitored_connect over connect #18635

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ disallowed-methods = [
{ path = "speedate::DateTime::parse_bytes_with_config", reason = "Please use `parse_bytes_rfc3339_with_config` instead." },
{ path = "speedate::Date::parse_str", reason = "Please use `parse_str_rfc3339` instead." },
{ path = "speedate::Date::parse_bytes", reason = "Please use `parse_bytes_rfc3339` instead." },
{ path = "tonic::transport::Endpoint::connect", reason = "Please use `EndpointExt::monitored_connect` instead." },
{ path = "tonic::transport::Endpoint::connect_lazy", reason = "Please use `EndpointExt::monitored_connect_lazy` instead." },
]
disallowed-types = [
{ path = "num_traits::AsPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." },
Expand Down
10 changes: 10 additions & 0 deletions src/common/metrics/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,16 @@ pub struct TcpConfig {
pub keepalive_duration: Option<Duration>,
}

#[allow(clippy::derivable_impls)]
impl Default for TcpConfig {
fn default() -> Self {
Self {
tcp_nodelay: false,
keepalive_duration: None,
}
}
}

pub fn monitor_connector<C>(
connector: C,
connection_type: impl Into<String>,
Expand Down
3 changes: 2 additions & 1 deletion src/ctl/src/cmd_impl/hummock/tiered_cache_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::monitor::EndpointExt;
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::TieredCacheTracingRequest;
use tonic::transport::Endpoint;
Expand All @@ -40,7 +41,7 @@ pub async fn tiered_cache_tracing(
let addr = worker_node.get_host().unwrap();
let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
.await?;
let mut client = MonitorServiceClient::new(channel);
client
Expand Down
15 changes: 8 additions & 7 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;

use risingwave_common::monitor::EndpointExt;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -47,7 +48,7 @@ impl CompactorClient {
pub async fn new(host_addr: HostAddr) -> Result<Self> {
let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-compactor-client", Default::default())
.await?;
Ok(Self {
monitor_client: MonitorServiceClient::new(channel),
Expand Down Expand Up @@ -94,26 +95,26 @@ pub struct GrpcCompactorProxyClient {
}

impl GrpcCompactorProxyClient {
pub fn new(channel: Channel, endpoint: String) -> Self {
pub async fn new(endpoint: String) -> Self {
let channel = Self::connect_to_endpoint(endpoint.clone()).await;
let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
Self { core, endpoint }
}

async fn recreate_core(&self) {
tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
let channel = self.connect_to_endpoint().await;
let channel = Self::connect_to_endpoint(self.endpoint.clone()).await;
let mut core = self.core.write().await;
*core = GrpcCompactorProxyClientCore::new(channel);
}

async fn connect_to_endpoint(&self) -> Channel {
let endpoint =
Endpoint::from_shared(self.endpoint.clone()).expect("Fail to construct tonic Endpoint");
async fn connect_to_endpoint(endpoint: String) -> Channel {
let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint");
endpoint
.http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
.keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
.connect_timeout(Duration::from_secs(5))
.connect()
.monitored_connect("grpc-compactor-proxy-client", Default::default())
.await
.expect("Failed to create channel via proxy rpc endpoint.")
}
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ComputeClient {
"grpc-compute-client",
TcpConfig {
tcp_nodelay: true,
keepalive_duration: None,
..Default::default()
},
)
.await?;
Expand Down
257 changes: 3 additions & 254 deletions src/rpc_client/src/connector_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::time::Duration;

use anyhow::{anyhow, Context};
use futures::TryStreamExt;
use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE};
use risingwave_common::monitor::{EndpointExt, TcpConfig};
use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient;
use risingwave_pb::connector_service::sink_coordinator_stream_request::{
CommitMetadata, StartCoordinator,
};
use anyhow::anyhow;
use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata;
use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload;
use risingwave_pb::connector_service::sink_writer_stream_request::{
Barrier, Request as SinkRequest, StartSink, WriteBatch,
Barrier, Request as SinkRequest, WriteBatch,
};
use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
use risingwave_pb::connector_service::*;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::{Channel, Endpoint};
use tonic::Streaming;
use tracing::error;

use crate::error::{Result, RpcError};
use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender};

#[derive(Clone, Debug)]
pub struct ConnectorClient {
rpc_client: ConnectorServiceClient<Channel>,
endpoint: String,
}

pub type SinkWriterRequestSender<REQ = SinkWriterStreamRequest> = BidiStreamSender<REQ>;
pub type SinkWriterResponseReceiver = BidiStreamReceiver<SinkWriterStreamResponse>;

Expand Down Expand Up @@ -143,232 +121,3 @@ impl SinkCoordinatorStreamHandle {
}
}
}

impl ConnectorClient {
pub async fn try_new(connector_endpoint: Option<&String>) -> Option<Self> {
match connector_endpoint {
None => None,
Some(connector_endpoint) => match ConnectorClient::new(connector_endpoint).await {
Ok(client) => Some(client),
Err(e) => {
error!(
endpoint = connector_endpoint,
error = %e.as_report(),
"invalid connector endpoint",
);
None
}
},
}
}

#[allow(clippy::unused_async)]
pub async fn new(connector_endpoint: &String) -> Result<Self> {
let endpoint = Endpoint::from_shared(format!("http://{}", connector_endpoint))
.with_context(|| format!("invalid connector endpoint `{}`", connector_endpoint))?
.initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
.initial_stream_window_size(STREAM_WINDOW_SIZE)
.connect_timeout(Duration::from_secs(5));

let channel = {
#[cfg(madsim)]
{
endpoint.connect().await?
}
#[cfg(not(madsim))]
{
endpoint.monitored_connect_lazy(
"grpc-connector-client",
TcpConfig {
tcp_nodelay: true,
keepalive_duration: None,
},
)
}
};
Ok(Self {
rpc_client: ConnectorServiceClient::new(channel).max_decoding_message_size(usize::MAX),
endpoint: connector_endpoint.to_string(),
})
}

pub fn endpoint(&self) -> &String {
&self.endpoint
}

/// Get source event stream
pub async fn start_source_stream(
&self,
source_id: u64,
source_type: SourceType,
start_offset: Option<String>,
properties: BTreeMap<String, String>,
snapshot_done: bool,
is_source_job: bool,
) -> Result<Streaming<GetEventStreamResponse>> {
Ok(self
.rpc_client
.clone()
.get_event_stream(GetEventStreamRequest {
source_id,
source_type: source_type as _,
start_offset: start_offset.unwrap_or_default(),
properties,
snapshot_done,
is_source_job,
})
.await
.inspect_err(|err| {
tracing::error!(
"failed to start stream for CDC source {}: {}",
source_id,
err.message()
)
})
.map_err(RpcError::from_connector_status)?
.into_inner())
}

/// Validate source properties
pub async fn validate_source_properties(
&self,
source_id: u64,
source_type: SourceType,
properties: BTreeMap<String, String>,
table_schema: Option<TableSchema>,
is_source_job: bool,
is_backfill_table: bool,
) -> Result<()> {
let table_schema = table_schema.map(|mut table_schema| {
table_schema.columns.retain(|c| {
!matches!(
c.generated_or_default_column,
Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
)
});
table_schema
});
let response = self
.rpc_client
.clone()
.validate_source(ValidateSourceRequest {
source_id,
source_type: source_type as _,
properties,
table_schema,
is_source_job,
is_backfill_table,
})
.await
.inspect_err(|err| {
tracing::error!("failed to validate source#{}: {}", source_id, err.message())
})
.map_err(RpcError::from_connector_status)?
.into_inner();

response.error.map_or(Ok(()), |err| {
Err(RpcError::Internal(anyhow!(format!(
"source cannot pass validation: {}",
err.error_message
))))
})
}

pub async fn start_sink_writer_stream(
&self,
payload_schema: Option<TableSchema>,
sink_proto: PbSinkParam,
) -> Result<SinkWriterStreamHandle> {
let mut rpc_client = self.rpc_client.clone();
let (handle, first_rsp) = SinkWriterStreamHandle::initialize(
SinkWriterStreamRequest {
request: Some(SinkRequest::Start(StartSink {
payload_schema,
sink_param: Some(sink_proto),
})),
},
|rx| async move {
rpc_client
.sink_writer_stream(ReceiverStream::new(rx))
.await
.map(|response| {
response
.into_inner()
.map_err(RpcError::from_connector_status)
})
.map_err(RpcError::from_connector_status)
},
)
.await?;

match first_rsp {
SinkWriterStreamResponse {
response: Some(sink_writer_stream_response::Response::Start(_)),
} => Ok(handle),
msg => Err(RpcError::Internal(anyhow!(
"should get start response but get {:?}",
msg
))),
}
}

pub async fn start_sink_coordinator_stream(
&self,
param: SinkParam,
) -> Result<SinkCoordinatorStreamHandle> {
let mut rpc_client = self.rpc_client.clone();
let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize(
SinkCoordinatorStreamRequest {
request: Some(sink_coordinator_stream_request::Request::Start(
StartCoordinator { param: Some(param) },
)),
},
|rx| async move {
rpc_client
.sink_coordinator_stream(ReceiverStream::new(rx))
.await
.map(|response| {
response
.into_inner()
.map_err(RpcError::from_connector_status)
})
.map_err(RpcError::from_connector_status)
},
)
.await?;

match first_rsp {
SinkCoordinatorStreamResponse {
response: Some(sink_coordinator_stream_response::Response::Start(_)),
} => Ok(handle),
msg => Err(RpcError::Internal(anyhow!(
"should get start response but get {:?}",
msg
))),
}
}

pub async fn validate_sink_properties(&self, sink_param: SinkParam) -> Result<()> {
let response = self
.rpc_client
.clone()
.validate_sink(ValidateSinkRequest {
sink_param: Some(sink_param),
})
.await
.inspect_err(|err| {
tracing::error!("failed to validate sink properties: {}", err.message())
})
.map_err(RpcError::from_connector_status)?
.into_inner();
response.error.map_or_else(
|| Ok(()), // If there is no error message, return Ok here.
|err| {
Err(RpcError::Internal(anyhow!(format!(
"sink cannot pass validation: {}",
err.error_message
))))
},
)
}
}
2 changes: 1 addition & 1 deletion src/rpc_client/src/frontend_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl FrontendClient {
"grpc-frontend-client",
TcpConfig {
tcp_nodelay: true,
keepalive_duration: None,
..Default::default()
},
)
.await?
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod tracing;

pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient};
pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
Expand Down
Loading
Loading