From 81978841b05edeaa9cab42cb233ad788f405e147 Mon Sep 17 00:00:00 2001 From: Tom Molesworth Date: Fri, 24 May 2024 16:06:01 +0800 Subject: [PATCH] Clean up before we hit the stream limit --- .../Subscription/Implementation/Redis.pm | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index da2bcc29..88adde1f 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -252,20 +252,23 @@ async method check_for_overflow () { push @emitters, $emitter; try { my $len = await $redis->stream_length($emitter->{stream}); - if ($len >= $emitter->{max_len}) { - unless ($emitter->{source}->is_paused) { - $emitter->{source}->pause; - $log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len}); - } + if ($len >= 0.75 * $emitter->{max_len}) { + # Try a regular cleanup if we're getting close to the limit await $redis->cleanup( stream => $emitter->{stream}, limit => $emitter->{max_len} ); - } else { - if($emitter->{source}->is_paused) { - $emitter->{source}->resume; - $log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len); + $len = await $redis->stream_length($emitter->{stream}); + } + + if ($len >= $emitter->{max_len}) { + unless ($emitter->{source}->is_paused) { + $emitter->{source}->pause; + $log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len}); } + } elsif ($emitter->{source}->is_paused) { + $emitter->{source}->resume; + $log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len); } } catch ($e) { $log->warnf('An error ocurred while trying to check on stream %s status - %s', $emitter->{stream}, $e);