Skip to content

Commit 07ce291

Browse files
authored
fix: make Postgres source change listening connection more stable (#1264)
1 parent 0d2d24a commit 07ce291

File tree

1 file changed

+36
-6
lines changed

1 file changed

+36
-6
lines changed

src/ops/sources/postgres.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::fmt::Write;
1212

1313
type PgValueDecoder = fn(&sqlx::postgres::PgRow, usize) -> Result<Value>;
1414

15+
const LISTENER_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(45);
1516
#[derive(Clone)]
1617
struct FieldSchemaInfo {
1718
schema: FieldSchema,
@@ -527,12 +528,41 @@ impl SourceExecutor for PostgresSourceExecutor {
527528
listener.listen(&notification_ctx.channel_name).await?;
528529

529530
let stream = stream! {
530-
while let Ok(notification) = listener.recv().await {
531-
let change = self.parse_notification_payload(&notification);
532-
yield change.map(|change| SourceChangeMessage {
533-
changes: vec![change],
534-
ack_fn: None,
535-
});
531+
loop {
532+
let mut heartbeat = tokio::time::interval(LISTENER_HEARTBEAT_INTERVAL);
533+
loop {
534+
tokio::select! {
535+
notification = listener.recv() => {
536+
let notification = match notification {
537+
Ok(notification) => notification,
538+
Err(e) => {
539+
warn!("Failed to receive notification from channel {}: {e:?}", notification_ctx.channel_name);
540+
break;
541+
}
542+
};
543+
let change = self.parse_notification_payload(&notification);
544+
yield change.map(|change| SourceChangeMessage {
545+
changes: vec![change],
546+
ack_fn: None,
547+
});
548+
}
549+
550+
_ = heartbeat.tick() => {
551+
let ok = tokio::time::timeout(std::time::Duration::from_secs(5),
552+
sqlx::query("SELECT 1").execute(&mut listener)
553+
).await.is_ok();
554+
if !ok {
555+
warn!("Listener heartbeat failed for channel {}", notification_ctx.channel_name);
556+
break;
557+
}
558+
559+
}
560+
}
561+
}
562+
std::mem::drop(listener);
563+
info!("Reconnecting to listener {}", notification_ctx.channel_name);
564+
listener = PgListener::connect_with(&self.db_pool).await?;
565+
listener.listen(&notification_ctx.channel_name).await?;
536566
}
537567
};
538568

0 commit comments

Comments
 (0)