Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions lib/Myriad.pm
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,7 @@ async method configure_from_argv (@args) {
# At this point, we expect `@args` to contain only the plain
# parameters such as the service name or a request to run an RPC
# method.
my $method = 'service';
while(@args) {
my $arg = shift @args;
if($commands->can($arg)) {
$method = $arg;
await $commands->$method(shift @args, @args);
last;
} else {
await $commands->$method($arg, @args);
last;
}
}
await $commands->from_array(@args);

$self->on_start(async sub {
await $config->listen_for_updates;
Expand Down Expand Up @@ -785,7 +774,8 @@ async method run () {
# Set shutdown future before starting commands.
$self->shutdown_future();

$commands->run_cmd->retain()->on_fail(sub {
$commands->run_commands->retain()->on_fail(sub ($e, @) {
$log->errorf('Command failed: %s', $e);
$self->shutdown->await();
});

Expand Down
57 changes: 43 additions & 14 deletions lib/Myriad/Commands.pm
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,42 @@ use Myriad::Util::UUID;
use Myriad::Service::Remote;

has $myriad;
has $cmd;
has $commands { [] };

BUILD (%args) {
weaken(
$myriad = $args{myriad} // die 'needs a Myriad parent object'
);
}

async method from_array (@args) {
while(@args) {
my $method = shift @args;
$log->infof('Attempting command %s with remaining %s', $method, \@args);
if($self->can($method)) {
await $self->$method(\@args);
} else {
# Put this back in and try again
unshift @args, $method;
await $self->service(\@args);
}
}
}

=head2 service

Attempts to load and start one or more services.

=cut

async method service (@args) {
async method service ($args) {
my (@modules, $namespace);
while(my $entry = shift @args) {
return unless $args->@*;

my @pending = split /,/, shift(@$args);
while(my $entry = shift @pending) {
if($entry =~ /,/) {
unshift @args, split /,/, $entry;
unshift @pending, split /,/, $entry;
} elsif(my ($base, $hierarchy) = $entry =~ m{^([a-z0-9_:]+)::(\*(?:::\*)*)?$}i) {
$namespace = $base . '::' if $hierarchy;
require Module::Pluggable::Object;
Expand Down Expand Up @@ -93,7 +110,7 @@ async method service (@args) {
}
}, foreach => \@services_modules, concurrent => 4);

$cmd = {
push $commands->@*, {
code => async sub {
try {
await fmap0 {
Expand All @@ -111,6 +128,7 @@ async method service (@args) {
},
params => {},
};
return;
}

=head2 remote_service
Expand All @@ -121,7 +139,7 @@ method remote_service {
return Myriad::Service::Remote->new(
myriad => $myriad,
service_name => $myriad->registry->make_service_name(
$myriad->config->service_name->as_string
$myriad->config->service_name('')->as_string, ''
) // die 'no service name found'
);
}
Expand All @@ -133,7 +151,7 @@ method remote_service {
async method rpc ($rpc, @args) {
my $remote_service = $self->remote_service;
die 'RPC args should be passed as (key value) pairs' unless @args % 2 == 0;
$cmd = {
push $commands->@*, {
code => async sub {
my $params = shift;
my ($remote_service, $rpc, $args) = map { $params->{$_} } qw(remote_service rpc args);
Expand All @@ -145,7 +163,11 @@ async method rpc ($rpc, @args) {
}
await $myriad->shutdown;
},
params => { rpc => $rpc, args => \@args, remote_service => $remote_service}
params => {
rpc => $rpc,
args => \@args,
remote_service => $remote_service
}
};
}

Expand All @@ -158,7 +180,7 @@ async method subscription ($stream, @args) {
my $uuid = Myriad::Util::UUID::uuid();
my $subscription = await $remote_service->subscribe($stream, "$0/$uuid");
$log->infof('Subscribing to: %s | %s', $remote_service->service_name, $stream);
$cmd = {
push $commands->@*, {
code => async sub {
my $params = shift;
my ($subscription, $args) = @{$params}{qw(subscription args)};
Expand All @@ -178,7 +200,7 @@ async method subscription ($stream, @args) {

async method storage ($action, $key, $extra = undef) {
my $remote_service = Myriad::Service::Remote->new(myriad => $myriad, service_name => $myriad->registry->make_service_name($myriad->config->service_name->as_string));
$cmd = {
push $commands->@*, {
code => async sub {
my $params = shift;
my ($remote_service, $action, $key, $extra) = map { $params->{$_} } qw(remote_service action key extra);
Expand All @@ -192,15 +214,22 @@ async method storage ($action, $key, $extra = undef) {

await $myriad->shutdown;
},
params => { action => $action, key => $key, extra => $extra, remote_service => $remote_service} };
params => { action => $action, key => $key, extra => $extra, remote_service => $remote_service}
};
}

=head2 run_cmd
=head2 run_commands

Execute all the pending commands collected up so far.

=cut

async method run_cmd () {
await $cmd->{code}->($cmd->{params}) if exists $cmd->{code};
async method run_commands () {
await Future->needs_all(
map {;
$_->{code} ? $_->{code}->($_->{params}) : ()
} splice @$commands, 0
);
}

1;
Expand Down
10 changes: 6 additions & 4 deletions t/commands.t
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ subtest "service command" => sub {
$metaclass->get_field('$config')->value($myriad) = Myriad::Config->new();

# Wrong Service(module) name
like( exception { wait_for_future( $command->service('Ta-wrong') )->get } , qr/unsupported/, 'Died when passing wrong format name');
like( exception { wait_for_future( $command->service('Ta_wrong') )->get } , qr/not found/, 'Died when passing module that does not exist');
like( exception { wait_for_future( $command->service(['Ta-wrong']) )->get } , qr/unsupported/, 'Died when passing wrong format name');
like( exception { wait_for_future( $command->service(['Ta_wrong']) )->get } , qr/not found/, 'Died when passing module that does not exist');

# Running multiple services
wait_for_future( $command->service('Ta::')->get->{code}->() )->get;
wait_for_future( $command->service(['Ta::']) )->get;
wait_for_future( $command->run_commands )->get;
cmp_deeply(\@added_services_modules, ['Ta::Sibling1', 'Ta::Sibling2'], 'Added both modules');
# Clear it for next test.
@added_services_modules = ();

# Running services under the same namespace
wait_for_future( $command->service('Ta::*')->get->{code}->() )->get;
wait_for_future( $command->service(['Ta::*']) )->get;
wait_for_future( $command->run_commands )->get;
cmp_deeply(\@added_services_modules, ['Ta::Sibling1', 'Ta::Sibling2'], 'Added modules under the namespace');
# Clear it for next test.
@added_services_modules = ();
Expand Down