Skip to content

Commit

Permalink
Merge pull request #319 from tom-binary/feature/early_cleanup
Browse files Browse the repository at this point in the history
Clean up before we hit the stream limit
  • Loading branch information
tom-binary authored Jun 6, 2024
2 parents 259c8a0 + 8197884 commit 966164c
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,23 @@ async method check_for_overflow () {
push @emitters, $emitter;
try {
my $len = await $redis->stream_length($emitter->{stream});
if ($len >= $emitter->{max_len}) {
unless ($emitter->{source}->is_paused) {
$emitter->{source}->pause;
$log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len});
}
if ($len >= 0.75 * $emitter->{max_len}) {
# Try a regular cleanup if we're getting close to the limit
await $redis->cleanup(
stream => $emitter->{stream},
limit => $emitter->{max_len}
);
} else {
if($emitter->{source}->is_paused) {
$emitter->{source}->resume;
$log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len);
$len = await $redis->stream_length($emitter->{stream});
}

if ($len >= $emitter->{max_len}) {
unless ($emitter->{source}->is_paused) {
$emitter->{source}->pause;
$log->infof('Paused subscription source on %s, length is %s, max allowed %s', $emitter->{stream}, $len, $emitter->{max_len});
}
} elsif ($emitter->{source}->is_paused) {
$emitter->{source}->resume;
$log->infof('Resumed subscription source on %s, length is %s', $emitter->{stream}, $len);
}
} catch ($e) {
$log->warnf('An error ocurred while trying to check on stream %s status - %s', $emitter->{stream}, $e);
Expand Down

0 comments on commit 966164c

Please sign in to comment.