From 75ae869b8e8ed335a3dcd0f18529d9321d06645e Mon Sep 17 00:00:00 2001 From: Tom Molesworth Date: Wed, 5 Jun 2024 03:34:29 +0800 Subject: [PATCH] Since we have await calls in here, switch to async method --- lib/Myriad/Subscription/Implementation/Redis.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index 88adde1f..efd8b9a1 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -57,7 +57,7 @@ async method create_from_source (%args) { $src->unblocked->then($self->$curry::weak(async method { # The streams will be checked later by "check_for_overflow" to avoid unblocking the source by mistake # we will make "check_for_overflow" aware about this stream after the service has started - await $src->map($self->$curry::weak(method ($event) { + await $src->map($self->$curry::weak(async method ($event) { my $target_stream = $stream; if(defined($args{subchannel_key})) { my $k = delete($event->{$args{subchannel_key}}); @@ -73,7 +73,7 @@ async method create_from_source (%args) { } $log->tracef('Subscription source %s adding an event: %s', $target_stream, $event); my $data = encode_json_utf8($event); - return $redis->xadd( + return await $redis->xadd( encode_utf8($target_stream) => '*', ($args{compress} || (defined $args{compress_threshold} and length($data) > $args{compress_threshold})) ? (zstd => Compress::Zstd::compress($data))