diff --git a/Makefile.PL b/Makefile.PL index 64895b75..d3312ee2 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -1,4 +1,4 @@ -# This file was automatically generated by Dist::Zilla::Plugin::MakeMaker v6.031. +# This file was automatically generated by Dist::Zilla::Plugin::MakeMaker v6.032. use strict; use warnings; diff --git a/lib/Myriad/Service/Attributes.pm b/lib/Myriad/Service/Attributes.pm index 0620ed81..e2734408 100644 --- a/lib/Myriad/Service/Attributes.pm +++ b/lib/Myriad/Service/Attributes.pm @@ -137,6 +137,8 @@ Takes the following parameters as a hashref: =item * C - compress any data which would be larger than the given size after encoding, in bytes +=item * C - emit to zero or more separate streams defined by this key in the emitted items + =back =cut diff --git a/lib/Myriad/Service/Implementation.pm b/lib/Myriad/Service/Implementation.pm index b8c49bae..6fbe1df5 100644 --- a/lib/Myriad/Service/Implementation.pm +++ b/lib/Myriad/Service/Implementation.pm @@ -261,6 +261,7 @@ async method load () { max_len => $spec->{args}{max_len}, compress => $spec->{args}{compress}, compress_threshold => $spec->{args}{compress_threshold}, + subchannel_key => $spec->{args}{subchannel_key}, ); } } @@ -271,6 +272,10 @@ async method load () { $log->tracef('Adding Receiver %s as %s for %s', $method, $receivers->{$method}, $service_name); my $spec = $receivers->{$method}; my $chan = $spec->{args}{channel} // die 'expected a channel, but there was none to be found'; + if(defined($spec->{args}{subchannel})) { + my $k = $spec->{args}{subchannel}; + $chan .= "{$k}"; + } my $sink = $spec->{sink} = $ryu->sink( label => "receiver:$chan", ); diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index 3b873f59..da2bcc29 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -48,19 +48,33 @@ async method create_from_source (%args) { $log->tracef('Adding subscription source %s to handler', $stream); push @emitters, { - stream => $stream, - source => $src, + stream => $stream, + source => $src, max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH - }; + } unless exists $args{subchannel_key}; + my %seen_channel; $self->adopt_future( $src->unblocked->then($self->$curry::weak(async method { # The streams will be checked later by "check_for_overflow" to avoid unblocking the source by mistake # we will make "check_for_overflow" aware about this stream after the service has started await $src->map($self->$curry::weak(method ($event) { - $log->tracef('Subscription source %s adding an event: %s', $stream, $event); + my $target_stream = $stream; + if(defined($args{subchannel_key})) { + my $k = delete($event->{$args{subchannel_key}}); + $target_stream .= "{$k}"; + if(!exists $seen_channel{$k}) { + push @emitters, { + stream => $target_stream, + source => $src, + max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH + }; + $seen_channel{$k} = 1; + } + } + $log->tracef('Subscription source %s adding an event: %s', $target_stream, $event); my $data = encode_json_utf8($event); return $redis->xadd( - encode_utf8($stream) => '*', + encode_utf8($target_stream) => '*', ($args{compress} || (defined $args{compress_threshold} and length($data) > $args{compress_threshold})) ? (zstd => Compress::Zstd::compress($data)) : (data => $data)