Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include support for subscription subchannels #318

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile.PL
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This file was automatically generated by Dist::Zilla::Plugin::MakeMaker v6.031.
# This file was automatically generated by Dist::Zilla::Plugin::MakeMaker v6.032.
use strict;
use warnings;

Expand Down
2 changes: 2 additions & 0 deletions lib/Myriad/Service/Attributes.pm
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ Takes the following parameters as a hashref:

=item * C<compress_threshold> - compress any data which would be larger than the given size after encoding, in bytes

=item * C<subchannel_key> - emit to zero or more separate streams defined by this key in the emitted items

=back

=cut
Expand Down
5 changes: 5 additions & 0 deletions lib/Myriad/Service/Implementation.pm
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async method load () {
max_len => $spec->{args}{max_len},
compress => $spec->{args}{compress},
compress_threshold => $spec->{args}{compress_threshold},
subchannel_key => $spec->{args}{subchannel_key},
);
}
}
Expand All @@ -271,6 +272,10 @@ async method load () {
$log->tracef('Adding Receiver %s as %s for %s', $method, $receivers->{$method}, $service_name);
my $spec = $receivers->{$method};
my $chan = $spec->{args}{channel} // die 'expected a channel, but there was none to be found';
if(defined($spec->{args}{subchannel})) {
my $k = $spec->{args}{subchannel};
$chan .= "{$k}";
}
my $sink = $spec->{sink} = $ryu->sink(
label => "receiver:$chan",
);
Expand Down
24 changes: 19 additions & 5 deletions lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,33 @@ async method create_from_source (%args) {

$log->tracef('Adding subscription source %s to handler', $stream);
push @emitters, {
stream => $stream,
source => $src,
stream => $stream,
source => $src,
max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH
};
} unless exists $args{subchannel_key};
my %seen_channel;
$self->adopt_future(
$src->unblocked->then($self->$curry::weak(async method {
# The streams will be checked later by "check_for_overflow" to avoid unblocking the source by mistake
# we will make "check_for_overflow" aware about this stream after the service has started
await $src->map($self->$curry::weak(method ($event) {
$log->tracef('Subscription source %s adding an event: %s', $stream, $event);
my $target_stream = $stream;
if(defined($args{subchannel_key})) {
my $k = delete($event->{$args{subchannel_key}});
$target_stream .= "{$k}";
if(!exists $seen_channel{$k}) {
push @emitters, {
stream => $target_stream,
source => $src,
max_len => $args{max_len} // MAX_ALLOWED_STREAM_LENGTH
};
$seen_channel{$k} = 1;
}
}
$log->tracef('Subscription source %s adding an event: %s', $target_stream, $event);
my $data = encode_json_utf8($event);
return $redis->xadd(
encode_utf8($stream) => '*',
encode_utf8($target_stream) => '*',
($args{compress} || (defined $args{compress_threshold} and length($data) > $args{compress_threshold}))
? (zstd => Compress::Zstd::compress($data))
: (data => $data)
Expand Down
Loading