Skip to content

Commit

Permalink
Merge pull request #320 from tom-binary/bugfix/async_subscription_ove…
Browse files Browse the repository at this point in the history
…rflow

Since we have await calls in here, switch to async method
  • Loading branch information
tom-binary authored Jun 6, 2024
2 parents 966164c + 75ae869 commit ab798ab
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async method create_from_source (%args) {
$src->unblocked->then($self->$curry::weak(async method {
# The streams will be checked later by "check_for_overflow" to avoid unblocking the source by mistake
# we will make "check_for_overflow" aware about this stream after the service has started
await $src->map($self->$curry::weak(method ($event) {
await $src->map($self->$curry::weak(async method ($event) {
my $target_stream = $stream;
if(defined($args{subchannel_key})) {
my $k = delete($event->{$args{subchannel_key}});
Expand All @@ -73,7 +73,7 @@ async method create_from_source (%args) {
}
$log->tracef('Subscription source %s adding an event: %s', $target_stream, $event);
my $data = encode_json_utf8($event);
return $redis->xadd(
return await $redis->xadd(
encode_utf8($target_stream) => '*',
($args{compress} || (defined $args{compress_threshold} and length($data) > $args{compress_threshold}))
? (zstd => Compress::Zstd::compress($data))
Expand Down

0 comments on commit ab798ab

Please sign in to comment.