diff --git a/lib/Myriad/Class.pm b/lib/Myriad/Class.pm index 5315527d..8f1b9b48 100644 --- a/lib/Myriad/Class.pm +++ b/lib/Myriad/Class.pm @@ -215,10 +215,21 @@ use Heap; use IO::Async::Notifier; use Log::Any qw($log); -use OpenTelemetry; -use OpenTracing::Any qw($tracer); use Metrics::Any; +use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY}; + +BEGIN { + if(USE_OPENTELEMETRY) { + require OpenTelemetry; + require OpenTelemetry::Context; + require OpenTelemetry::Trace; + require OpenTelemetry::Constants; + } +} + +require OpenTracing::Any; + sub import { my $called_on = shift; @@ -359,12 +370,19 @@ sub import { { no strict 'refs'; if($version >= 2) { - my $provider = OpenTelemetry->tracer_provider; - *{$pkg . '::log'} = \(OpenTelemetry->logger); - *{$pkg . '::tracer'} = \($provider->tracer( - name => 'myriad', - version => $version, - )); + if(USE_OPENTELEMETRY) { + my $provider = OpenTelemetry->tracer_provider; + *{$pkg . '::log'} = \(OpenTelemetry->logger); + *{$pkg . '::tracer'} = \($provider->tracer( + name => 'myriad', + version => $version, + )); + } else { + *{$pkg . '::log'} = \Log::Any->get_logger( + category => $pkg + ); + *{$pkg . '::tracer'} = \undef; + } } else { # Essentially the same as importing Log::Any qw($log) for now, # but we may want to customise this with some additional attributes. diff --git a/lib/Myriad/Service/Implementation.pm b/lib/Myriad/Service/Implementation.pm index 882dcac6..f234c0c2 100644 --- a/lib/Myriad/Service/Implementation.pm +++ b/lib/Myriad/Service/Implementation.pm @@ -23,14 +23,19 @@ use Myriad::Util::UUID; use Myriad::Service::Attributes; -use OpenTelemetry::Context; -use OpenTelemetry::Trace; -use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK ); - # Only defer up to this many seconds between batch iterations use constant MAX_EXPONENTIAL_BACKOFF => 2; -use constant USE_OPENTELEMETRY => 0; +use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY} || 0; + +BEGIN { + if(USE_OPENTELEMETRY) { + require OpenTelemetry::Context; + require OpenTelemetry::Trace; + require OpenTelemetry::Constants; + OpenTelemetry::Constants->import(qw( SPAN_STATUS_ERROR SPAN_STATUS_OK )); + } +} sub MODIFY_CODE_ATTRIBUTES { my ($class, $code, @attrs) = @_; diff --git a/lib/Myriad/Subscription/Implementation/Memory.pm b/lib/Myriad/Subscription/Implementation/Memory.pm index ac966ada..91a74624 100644 --- a/lib/Myriad/Subscription/Implementation/Memory.pm +++ b/lib/Myriad/Subscription/Implementation/Memory.pm @@ -5,9 +5,16 @@ use Myriad::Class ':v2', extends => qw(IO::Async::Notifier), does => [ 'Myriad::Util::Defer' ]; -use OpenTelemetry::Context; -use OpenTelemetry::Trace; -use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK ); +use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY}; + +BEGIN { + if(USE_OPENTELEMETRY) { + require OpenTelemetry::Context; + require OpenTelemetry::Trace; + require OpenTelemetry::Constants; + OpenTelemetry::Constants->import(qw( SPAN_STATUS_ERROR SPAN_STATUS_OK )); + } +} # VERSION # AUTHORITY @@ -96,37 +103,55 @@ async method start { $subscription->{group_name}, 'consumer' ); - for my $event_id (sort keys $messages->%*) { - my $span = $tracer->create_span( - parent => OpenTelemetry::Context->current, - name => $subscription->{channel}, - attributes => { - group => $subscription->{group_name}, - }, - ); - try { - my $context = OpenTelemetry::Trace->context_with_span($span); - dynamically OpenTelemetry::Context->current = $context; - - $subscription->{sink}->emit($messages->{$event_id}); - await $transport->ack_message( - $subscription->{channel}, - $subscription->{group_name}, - $event_id - ); - - $span->set_status( - SPAN_STATUS_OK - ); - } catch ($e) { - $e = Myriad::Exception::InternalError->new( - reason => $e - ) unless blessed($e) and $e->DOES('Myriad::Exception'); - $log->errorf('Failed to process event %s - %s', $event_id, $e); - $span->record_exception($e); - $span->set_status( - SPAN_STATUS_ERROR, $e + if(USE_OPENTELEMETRY) { + for my $event_id (sort keys $messages->%*) { + my $span = $tracer->create_span( + parent => OpenTelemetry::Context->current, + name => $subscription->{channel}, + attributes => { + group => $subscription->{group_name}, + }, ); + try { + my $context = OpenTelemetry::Trace->context_with_span($span); + dynamically OpenTelemetry::Context->current = $context; + + $subscription->{sink}->emit($messages->{$event_id}); + await $transport->ack_message( + $subscription->{channel}, + $subscription->{group_name}, + $event_id + ); + + $span->set_status( + SPAN_STATUS_OK + ); + } catch ($e) { + $e = Myriad::Exception::InternalError->new( + reason => $e + ) unless blessed($e) and $e->DOES('Myriad::Exception'); + $log->errorf('Failed to process event %s - %s', $event_id, $e); + $span->record_exception($e); + $span->set_status( + SPAN_STATUS_ERROR, $e + ); + } + } + } else { + for my $event_id (sort keys $messages->%*) { + try { + $subscription->{sink}->emit($messages->{$event_id}); + await $transport->ack_message( + $subscription->{channel}, + $subscription->{group_name}, + $event_id + ); + } catch ($e) { + $e = Myriad::Exception::InternalError->new( + reason => $e + ) unless blessed($e) and $e->DOES('Myriad::Exception'); + $log->errorf('Failed to process event %s - %s', $event_id, $e); + } } } diff --git a/lib/Myriad/Subscription/Implementation/Redis.pm b/lib/Myriad/Subscription/Implementation/Redis.pm index 18484eb6..f426ee49 100644 --- a/lib/Myriad/Subscription/Implementation/Redis.pm +++ b/lib/Myriad/Subscription/Implementation/Redis.pm @@ -9,13 +9,19 @@ use Myriad::Class ':v2', extends => qw(IO::Async::Notifier), does => [ use Myriad::Util::UUID; use Compress::Zstd (); -use OpenTelemetry::Context; -use OpenTelemetry::Trace; -use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK ); use constant MAX_ALLOWED_STREAM_LENGTH => 10_000; -use constant USE_OPENTELEMETRY => 0; +use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY}; + +BEGIN { + if(USE_OPENTELEMETRY) { + require OpenTelemetry::Context; + require OpenTelemetry::Trace; + require OpenTelemetry::Constants; + OpenTelemetry::Constants->import(qw( SPAN_STATUS_ERROR SPAN_STATUS_OK )); + } +} field $redis;