From 78592a411116765f139cf26f76e41e42f86d3fdc Mon Sep 17 00:00:00 2001 From: Tom Molesworth Date: Mon, 25 Apr 2022 08:50:07 +0800 Subject: [PATCH] Support multiple operations for myriad.pl commandline invocation --- lib/Myriad.pm | 16 +++--------- lib/Myriad/Commands.pm | 57 +++++++++++++++++++++++++++++++----------- t/commands.t | 10 +++++--- 3 files changed, 52 insertions(+), 31 deletions(-) diff --git a/lib/Myriad.pm b/lib/Myriad.pm index aa25ef44..fadfcf93 100644 --- a/lib/Myriad.pm +++ b/lib/Myriad.pm @@ -311,18 +311,7 @@ async method configure_from_argv (@args) { # At this point, we expect `@args` to contain only the plain # parameters such as the service name or a request to run an RPC # method. - my $method = 'service'; - while(@args) { - my $arg = shift @args; - if($commands->can($arg)) { - $method = $arg; - await $commands->$method(shift @args, @args); - last; - } else { - await $commands->$method($arg, @args); - last; - } - } + await $commands->from_array(@args); $self->on_start(async sub { await $config->listen_for_updates; @@ -785,7 +774,8 @@ async method run () { # Set shutdown future before starting commands. $self->shutdown_future(); - $commands->run_cmd->retain()->on_fail(sub { + $commands->run_commands->retain()->on_fail(sub ($e, @) { + $log->errorf('Command failed: %s', $e); $self->shutdown->await(); }); diff --git a/lib/Myriad/Commands.pm b/lib/Myriad/Commands.pm index 8019c056..5f540cbf 100644 --- a/lib/Myriad/Commands.pm +++ b/lib/Myriad/Commands.pm @@ -27,7 +27,7 @@ use Myriad::Util::UUID; use Myriad::Service::Remote; has $myriad; -has $cmd; +has $commands { [] }; BUILD (%args) { weaken( @@ -35,17 +35,34 @@ BUILD (%args) { ); } +async method from_array (@args) { + while(@args) { + my $method = shift @args; + $log->infof('Attempting command %s with remaining %s', $method, \@args); + if($self->can($method)) { + await $self->$method(\@args); + } else { + # Put this back in and try again + unshift @args, $method; + await $self->service(\@args); + } + } +} + =head2 service Attempts to load and start one or more services. =cut -async method service (@args) { +async method service ($args) { my (@modules, $namespace); - while(my $entry = shift @args) { + return unless $args->@*; + + my @pending = split /,/, shift(@$args); + while(my $entry = shift @pending) { if($entry =~ /,/) { - unshift @args, split /,/, $entry; + unshift @pending, split /,/, $entry; } elsif(my ($base, $hierarchy) = $entry =~ m{^([a-z0-9_:]+)::(\*(?:::\*)*)?$}i) { $namespace = $base . '::' if $hierarchy; require Module::Pluggable::Object; @@ -93,7 +110,7 @@ async method service (@args) { } }, foreach => \@services_modules, concurrent => 4); - $cmd = { + push $commands->@*, { code => async sub { try { await fmap0 { @@ -111,6 +128,7 @@ async method service (@args) { }, params => {}, }; + return; } =head2 remote_service @@ -121,7 +139,7 @@ method remote_service { return Myriad::Service::Remote->new( myriad => $myriad, service_name => $myriad->registry->make_service_name( - $myriad->config->service_name->as_string + $myriad->config->service_name('')->as_string, '' ) // die 'no service name found' ); } @@ -133,7 +151,7 @@ method remote_service { async method rpc ($rpc, @args) { my $remote_service = $self->remote_service; die 'RPC args should be passed as (key value) pairs' unless @args % 2 == 0; - $cmd = { + push $commands->@*, { code => async sub { my $params = shift; my ($remote_service, $rpc, $args) = map { $params->{$_} } qw(remote_service rpc args); @@ -145,7 +163,11 @@ async method rpc ($rpc, @args) { } await $myriad->shutdown; }, - params => { rpc => $rpc, args => \@args, remote_service => $remote_service} + params => { + rpc => $rpc, + args => \@args, + remote_service => $remote_service + } }; } @@ -158,7 +180,7 @@ async method subscription ($stream, @args) { my $uuid = Myriad::Util::UUID::uuid(); my $subscription = await $remote_service->subscribe($stream, "$0/$uuid"); $log->infof('Subscribing to: %s | %s', $remote_service->service_name, $stream); - $cmd = { + push $commands->@*, { code => async sub { my $params = shift; my ($subscription, $args) = @{$params}{qw(subscription args)}; @@ -178,7 +200,7 @@ async method subscription ($stream, @args) { async method storage ($action, $key, $extra = undef) { my $remote_service = Myriad::Service::Remote->new(myriad => $myriad, service_name => $myriad->registry->make_service_name($myriad->config->service_name->as_string)); - $cmd = { + push $commands->@*, { code => async sub { my $params = shift; my ($remote_service, $action, $key, $extra) = map { $params->{$_} } qw(remote_service action key extra); @@ -192,15 +214,22 @@ async method storage ($action, $key, $extra = undef) { await $myriad->shutdown; }, - params => { action => $action, key => $key, extra => $extra, remote_service => $remote_service} }; + params => { action => $action, key => $key, extra => $extra, remote_service => $remote_service} + }; } -=head2 run_cmd +=head2 run_commands + +Execute all the pending commands collected up so far. =cut -async method run_cmd () { - await $cmd->{code}->($cmd->{params}) if exists $cmd->{code}; +async method run_commands () { + await Future->needs_all( + map {; + $_->{code} ? $_->{code}->($_->{params}) : () + } splice @$commands, 0 + ); } 1; diff --git a/t/commands.t b/t/commands.t index 378decb4..f3a8fb9e 100644 --- a/t/commands.t +++ b/t/commands.t @@ -73,17 +73,19 @@ subtest "service command" => sub { $metaclass->get_field('$config')->value($myriad) = Myriad::Config->new(); # Wrong Service(module) name - like( exception { wait_for_future( $command->service('Ta-wrong') )->get } , qr/unsupported/, 'Died when passing wrong format name'); - like( exception { wait_for_future( $command->service('Ta_wrong') )->get } , qr/not found/, 'Died when passing module that does not exist'); + like( exception { wait_for_future( $command->service(['Ta-wrong']) )->get } , qr/unsupported/, 'Died when passing wrong format name'); + like( exception { wait_for_future( $command->service(['Ta_wrong']) )->get } , qr/not found/, 'Died when passing module that does not exist'); # Running multiple services - wait_for_future( $command->service('Ta::')->get->{code}->() )->get; + wait_for_future( $command->service(['Ta::']) )->get; + wait_for_future( $command->run_commands )->get; cmp_deeply(\@added_services_modules, ['Ta::Sibling1', 'Ta::Sibling2'], 'Added both modules'); # Clear it for next test. @added_services_modules = (); # Running services under the same namespace - wait_for_future( $command->service('Ta::*')->get->{code}->() )->get; + wait_for_future( $command->service(['Ta::*']) )->get; + wait_for_future( $command->run_commands )->get; cmp_deeply(\@added_services_modules, ['Ta::Sibling1', 'Ta::Sibling2'], 'Added modules under the namespace'); # Clear it for next test. @added_services_modules = ();