|
12 | 12 | // See the License for the specific language governing permissions and
|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
15 |
| -use std::collections::BTreeMap; |
16 |
| -use std::fmt::Debug; |
17 |
| -use std::time::Duration; |
18 |
| - |
19 |
| -use anyhow::{anyhow, Context}; |
20 |
| -use futures::TryStreamExt; |
21 |
| -use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; |
22 |
| -use risingwave_common::monitor::{EndpointExt, TcpConfig}; |
23 |
| -use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient; |
24 |
| -use risingwave_pb::connector_service::sink_coordinator_stream_request::{ |
25 |
| - CommitMetadata, StartCoordinator, |
26 |
| -}; |
| 15 | +use anyhow::anyhow; |
| 16 | +use risingwave_pb::connector_service::sink_coordinator_stream_request::CommitMetadata; |
27 | 17 | use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload;
|
28 | 18 | use risingwave_pb::connector_service::sink_writer_stream_request::{
|
29 |
| - Barrier, Request as SinkRequest, StartSink, WriteBatch, |
| 19 | + Barrier, Request as SinkRequest, WriteBatch, |
30 | 20 | };
|
31 | 21 | use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
|
32 | 22 | use risingwave_pb::connector_service::*;
|
33 |
| -use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; |
34 |
| -use thiserror_ext::AsReport; |
35 |
| -use tokio_stream::wrappers::ReceiverStream; |
36 |
| -use tonic::transport::{Channel, Endpoint}; |
37 |
| -use tonic::Streaming; |
38 |
| -use tracing::error; |
39 | 23 |
|
40 | 24 | use crate::error::{Result, RpcError};
|
41 | 25 | use crate::{BidiStreamHandle, BidiStreamReceiver, BidiStreamSender};
|
42 | 26 |
|
43 |
| -#[derive(Clone, Debug)] |
44 |
| -pub struct ConnectorClient { |
45 |
| - rpc_client: ConnectorServiceClient<Channel>, |
46 |
| - endpoint: String, |
47 |
| -} |
48 |
| - |
49 | 27 | pub type SinkWriterRequestSender<REQ = SinkWriterStreamRequest> = BidiStreamSender<REQ>;
|
50 | 28 | pub type SinkWriterResponseReceiver = BidiStreamReceiver<SinkWriterStreamResponse>;
|
51 | 29 |
|
@@ -143,232 +121,3 @@ impl SinkCoordinatorStreamHandle {
|
143 | 121 | }
|
144 | 122 | }
|
145 | 123 | }
|
146 |
| - |
147 |
| -impl ConnectorClient { |
148 |
| - pub async fn try_new(connector_endpoint: Option<&String>) -> Option<Self> { |
149 |
| - match connector_endpoint { |
150 |
| - None => None, |
151 |
| - Some(connector_endpoint) => match ConnectorClient::new(connector_endpoint).await { |
152 |
| - Ok(client) => Some(client), |
153 |
| - Err(e) => { |
154 |
| - error!( |
155 |
| - endpoint = connector_endpoint, |
156 |
| - error = %e.as_report(), |
157 |
| - "invalid connector endpoint", |
158 |
| - ); |
159 |
| - None |
160 |
| - } |
161 |
| - }, |
162 |
| - } |
163 |
| - } |
164 |
| - |
165 |
| - #[allow(clippy::unused_async)] |
166 |
| - pub async fn new(connector_endpoint: &String) -> Result<Self> { |
167 |
| - let endpoint = Endpoint::from_shared(format!("http://{}", connector_endpoint)) |
168 |
| - .with_context(|| format!("invalid connector endpoint `{}`", connector_endpoint))? |
169 |
| - .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) |
170 |
| - .initial_stream_window_size(STREAM_WINDOW_SIZE) |
171 |
| - .connect_timeout(Duration::from_secs(5)); |
172 |
| - |
173 |
| - let channel = { |
174 |
| - #[cfg(madsim)] |
175 |
| - { |
176 |
| - endpoint.connect().await? |
177 |
| - } |
178 |
| - #[cfg(not(madsim))] |
179 |
| - { |
180 |
| - endpoint.monitored_connect_lazy( |
181 |
| - "grpc-connector-client", |
182 |
| - TcpConfig { |
183 |
| - tcp_nodelay: true, |
184 |
| - keepalive_duration: None, |
185 |
| - }, |
186 |
| - ) |
187 |
| - } |
188 |
| - }; |
189 |
| - Ok(Self { |
190 |
| - rpc_client: ConnectorServiceClient::new(channel).max_decoding_message_size(usize::MAX), |
191 |
| - endpoint: connector_endpoint.to_string(), |
192 |
| - }) |
193 |
| - } |
194 |
| - |
195 |
| - pub fn endpoint(&self) -> &String { |
196 |
| - &self.endpoint |
197 |
| - } |
198 |
| - |
199 |
| - /// Get source event stream |
200 |
| - pub async fn start_source_stream( |
201 |
| - &self, |
202 |
| - source_id: u64, |
203 |
| - source_type: SourceType, |
204 |
| - start_offset: Option<String>, |
205 |
| - properties: BTreeMap<String, String>, |
206 |
| - snapshot_done: bool, |
207 |
| - is_source_job: bool, |
208 |
| - ) -> Result<Streaming<GetEventStreamResponse>> { |
209 |
| - Ok(self |
210 |
| - .rpc_client |
211 |
| - .clone() |
212 |
| - .get_event_stream(GetEventStreamRequest { |
213 |
| - source_id, |
214 |
| - source_type: source_type as _, |
215 |
| - start_offset: start_offset.unwrap_or_default(), |
216 |
| - properties, |
217 |
| - snapshot_done, |
218 |
| - is_source_job, |
219 |
| - }) |
220 |
| - .await |
221 |
| - .inspect_err(|err| { |
222 |
| - tracing::error!( |
223 |
| - "failed to start stream for CDC source {}: {}", |
224 |
| - source_id, |
225 |
| - err.message() |
226 |
| - ) |
227 |
| - }) |
228 |
| - .map_err(RpcError::from_connector_status)? |
229 |
| - .into_inner()) |
230 |
| - } |
231 |
| - |
232 |
| - /// Validate source properties |
233 |
| - pub async fn validate_source_properties( |
234 |
| - &self, |
235 |
| - source_id: u64, |
236 |
| - source_type: SourceType, |
237 |
| - properties: BTreeMap<String, String>, |
238 |
| - table_schema: Option<TableSchema>, |
239 |
| - is_source_job: bool, |
240 |
| - is_backfill_table: bool, |
241 |
| - ) -> Result<()> { |
242 |
| - let table_schema = table_schema.map(|mut table_schema| { |
243 |
| - table_schema.columns.retain(|c| { |
244 |
| - !matches!( |
245 |
| - c.generated_or_default_column, |
246 |
| - Some(GeneratedOrDefaultColumn::GeneratedColumn(_)) |
247 |
| - ) |
248 |
| - }); |
249 |
| - table_schema |
250 |
| - }); |
251 |
| - let response = self |
252 |
| - .rpc_client |
253 |
| - .clone() |
254 |
| - .validate_source(ValidateSourceRequest { |
255 |
| - source_id, |
256 |
| - source_type: source_type as _, |
257 |
| - properties, |
258 |
| - table_schema, |
259 |
| - is_source_job, |
260 |
| - is_backfill_table, |
261 |
| - }) |
262 |
| - .await |
263 |
| - .inspect_err(|err| { |
264 |
| - tracing::error!("failed to validate source#{}: {}", source_id, err.message()) |
265 |
| - }) |
266 |
| - .map_err(RpcError::from_connector_status)? |
267 |
| - .into_inner(); |
268 |
| - |
269 |
| - response.error.map_or(Ok(()), |err| { |
270 |
| - Err(RpcError::Internal(anyhow!(format!( |
271 |
| - "source cannot pass validation: {}", |
272 |
| - err.error_message |
273 |
| - )))) |
274 |
| - }) |
275 |
| - } |
276 |
| - |
277 |
| - pub async fn start_sink_writer_stream( |
278 |
| - &self, |
279 |
| - payload_schema: Option<TableSchema>, |
280 |
| - sink_proto: PbSinkParam, |
281 |
| - ) -> Result<SinkWriterStreamHandle> { |
282 |
| - let mut rpc_client = self.rpc_client.clone(); |
283 |
| - let (handle, first_rsp) = SinkWriterStreamHandle::initialize( |
284 |
| - SinkWriterStreamRequest { |
285 |
| - request: Some(SinkRequest::Start(StartSink { |
286 |
| - payload_schema, |
287 |
| - sink_param: Some(sink_proto), |
288 |
| - })), |
289 |
| - }, |
290 |
| - |rx| async move { |
291 |
| - rpc_client |
292 |
| - .sink_writer_stream(ReceiverStream::new(rx)) |
293 |
| - .await |
294 |
| - .map(|response| { |
295 |
| - response |
296 |
| - .into_inner() |
297 |
| - .map_err(RpcError::from_connector_status) |
298 |
| - }) |
299 |
| - .map_err(RpcError::from_connector_status) |
300 |
| - }, |
301 |
| - ) |
302 |
| - .await?; |
303 |
| - |
304 |
| - match first_rsp { |
305 |
| - SinkWriterStreamResponse { |
306 |
| - response: Some(sink_writer_stream_response::Response::Start(_)), |
307 |
| - } => Ok(handle), |
308 |
| - msg => Err(RpcError::Internal(anyhow!( |
309 |
| - "should get start response but get {:?}", |
310 |
| - msg |
311 |
| - ))), |
312 |
| - } |
313 |
| - } |
314 |
| - |
315 |
| - pub async fn start_sink_coordinator_stream( |
316 |
| - &self, |
317 |
| - param: SinkParam, |
318 |
| - ) -> Result<SinkCoordinatorStreamHandle> { |
319 |
| - let mut rpc_client = self.rpc_client.clone(); |
320 |
| - let (handle, first_rsp) = SinkCoordinatorStreamHandle::initialize( |
321 |
| - SinkCoordinatorStreamRequest { |
322 |
| - request: Some(sink_coordinator_stream_request::Request::Start( |
323 |
| - StartCoordinator { param: Some(param) }, |
324 |
| - )), |
325 |
| - }, |
326 |
| - |rx| async move { |
327 |
| - rpc_client |
328 |
| - .sink_coordinator_stream(ReceiverStream::new(rx)) |
329 |
| - .await |
330 |
| - .map(|response| { |
331 |
| - response |
332 |
| - .into_inner() |
333 |
| - .map_err(RpcError::from_connector_status) |
334 |
| - }) |
335 |
| - .map_err(RpcError::from_connector_status) |
336 |
| - }, |
337 |
| - ) |
338 |
| - .await?; |
339 |
| - |
340 |
| - match first_rsp { |
341 |
| - SinkCoordinatorStreamResponse { |
342 |
| - response: Some(sink_coordinator_stream_response::Response::Start(_)), |
343 |
| - } => Ok(handle), |
344 |
| - msg => Err(RpcError::Internal(anyhow!( |
345 |
| - "should get start response but get {:?}", |
346 |
| - msg |
347 |
| - ))), |
348 |
| - } |
349 |
| - } |
350 |
| - |
351 |
| - pub async fn validate_sink_properties(&self, sink_param: SinkParam) -> Result<()> { |
352 |
| - let response = self |
353 |
| - .rpc_client |
354 |
| - .clone() |
355 |
| - .validate_sink(ValidateSinkRequest { |
356 |
| - sink_param: Some(sink_param), |
357 |
| - }) |
358 |
| - .await |
359 |
| - .inspect_err(|err| { |
360 |
| - tracing::error!("failed to validate sink properties: {}", err.message()) |
361 |
| - }) |
362 |
| - .map_err(RpcError::from_connector_status)? |
363 |
| - .into_inner(); |
364 |
| - response.error.map_or_else( |
365 |
| - || Ok(()), // If there is no error message, return Ok here. |
366 |
| - |err| { |
367 |
| - Err(RpcError::Internal(anyhow!(format!( |
368 |
| - "sink cannot pass validation: {}", |
369 |
| - err.error_message |
370 |
| - )))) |
371 |
| - }, |
372 |
| - ) |
373 |
| - } |
374 |
| -} |
0 commit comments