From 7bfd7bd2a648483a9cd24525eacffc32dc98a1d5 Mon Sep 17 00:00:00 2001 From: Andreas Bergmaier <53600133+ab-pm@users.noreply.github.com> Date: Fri, 10 Sep 2021 11:15:42 +0200 Subject: [PATCH 1/3] Flag to stop LDS loop while no timer is active Fixes #755 --- packages/lds/src/index.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/lds/src/index.ts b/packages/lds/src/index.ts index 4e963da2d..8d6a008f3 100755 --- a/packages/lds/src/index.ts +++ b/packages/lds/src/index.ts @@ -92,16 +92,19 @@ export default async function subscribeToLogicalDecoding( } let loopTimeout: NodeJS.Timer; + let nextSleepDuration = 0; const ldSubscription = { close: async () => { clearTimeout(loopTimeout); + nextSleepDuration = 0; await client.close(); }, }; let nextStaleCheck = Date.now() + DROP_STALE_SLOTS_INTERVAL; async function loop() { + nextSleepDuration = sleepDuration; try { const rows = await client.getChanges(null, 500); if (rows.length) { @@ -161,10 +164,11 @@ export default async function subscribeToLogicalDecoding( } catch (e) { console.error("Error during LDS loop:", e.message); // Recovery time... - loopTimeout = setTimeout(loop, sleepDuration * 10); - return; + nextSleepDuration *= 10; + } + if (nextSleepDuration) { // else loop has been stopped + loopTimeout = setTimeout(loop, nextSleepDuration); } - loopTimeout = setTimeout(loop, sleepDuration); } loop(); return ldSubscription; From 2ca6a7e5808340fe39c4836275cba4b8063e2b34 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 10 Sep 2021 13:17:04 +0100 Subject: [PATCH 2/3] Apply suggestions from code review --- packages/lds/src/index.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/lds/src/index.ts b/packages/lds/src/index.ts index 8d6a008f3..1eb8a1aa4 100755 --- a/packages/lds/src/index.ts +++ b/packages/lds/src/index.ts @@ -92,18 +92,20 @@ export default async function subscribeToLogicalDecoding( } let loopTimeout: NodeJS.Timer; - let nextSleepDuration = 0; + /** A value of `-1` means to stop looping. */ + let nextSleepDuration = -1; const ldSubscription = { close: async () => { clearTimeout(loopTimeout); - nextSleepDuration = 0; + nextSleepDuration = -1; await client.close(); }, }; let nextStaleCheck = Date.now() + DROP_STALE_SLOTS_INTERVAL; async function loop() { + // Reset the sleep duration (useful after an error where sleep is 10x larger) nextSleepDuration = sleepDuration; try { const rows = await client.getChanges(null, 500); @@ -164,10 +166,15 @@ export default async function subscribeToLogicalDecoding( } catch (e) { console.error("Error during LDS loop:", e.message); // Recovery time... - nextSleepDuration *= 10; + if (nextSleepDuration >= 0) { + nextSleepDuration = sleepDuration * 10; + } } - if (nextSleepDuration) { // else loop has been stopped + if (nextSleepDuration >= 0) { loopTimeout = setTimeout(loop, nextSleepDuration); + } else { + // loop() was stopped; abort + return; } } loop(); From 02c6c8ee0501f83de34147ce2079f7432764fb96 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 13 Sep 2021 10:04:04 +0100 Subject: [PATCH 3/3] Apply suggestions from code review --- packages/lds/src/index.ts | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/lds/src/index.ts b/packages/lds/src/index.ts index 1eb8a1aa4..ad0a8d9e7 100755 --- a/packages/lds/src/index.ts +++ b/packages/lds/src/index.ts @@ -92,23 +92,26 @@ export default async function subscribeToLogicalDecoding( } let loopTimeout: NodeJS.Timer; - /** A value of `-1` means to stop looping. */ - let nextSleepDuration = -1; + /** Set true to halt the loop. */ + let isClosing = false; const ldSubscription = { close: async () => { clearTimeout(loopTimeout); - nextSleepDuration = -1; + // If the `loop` function is currently in progress, tell it not to queue again. + isClosing = true; await client.close(); }, }; let nextStaleCheck = Date.now() + DROP_STALE_SLOTS_INTERVAL; async function loop() { - // Reset the sleep duration (useful after an error where sleep is 10x larger) - nextSleepDuration = sleepDuration; try { const rows = await client.getChanges(null, 500); + if (isClosing) { + // Skip processing this data and exit. + return; + } if (rows.length) { for (const row of rows) { const { @@ -166,16 +169,14 @@ export default async function subscribeToLogicalDecoding( } catch (e) { console.error("Error during LDS loop:", e.message); // Recovery time... - if (nextSleepDuration >= 0) { - nextSleepDuration = sleepDuration * 10; + if (!isClosing) { + loopTimeout = setTimeout(loop, sleepDuration * 10); } - } - if (nextSleepDuration >= 0) { - loopTimeout = setTimeout(loop, nextSleepDuration); - } else { - // loop() was stopped; abort return; } + if (!isClosing) { + loopTimeout = setTimeout(loop, sleepDuration); + } } loop(); return ldSubscription;