Skip to content

Commit

Permalink
Merge pull request #333 from tom-binary/bugfix/disable_opentelemetry_…
Browse files Browse the repository at this point in the history
…by_default

Only enable OpenTelemetry when explicitly requested
  • Loading branch information
tom-binary authored Jul 22, 2024
2 parents cdd8fbb + 81db812 commit 213e18a
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 50 deletions.
34 changes: 26 additions & 8 deletions lib/Myriad/Class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,21 @@ use Heap;
use IO::Async::Notifier;

use Log::Any qw($log);
use OpenTelemetry;
use OpenTracing::Any qw($tracer);
use Metrics::Any;

use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY};

BEGIN {
if(USE_OPENTELEMETRY) {
require OpenTelemetry;
require OpenTelemetry::Context;
require OpenTelemetry::Trace;
require OpenTelemetry::Constants;
}
}

require OpenTracing::Any;

sub import {
my $called_on = shift;

Expand Down Expand Up @@ -359,12 +370,19 @@ sub import {
{
no strict 'refs';
if($version >= 2) {
my $provider = OpenTelemetry->tracer_provider;
*{$pkg . '::log'} = \(OpenTelemetry->logger);
*{$pkg . '::tracer'} = \($provider->tracer(
name => 'myriad',
version => $version,
));
if(USE_OPENTELEMETRY) {
my $provider = OpenTelemetry->tracer_provider;
*{$pkg . '::log'} = \(OpenTelemetry->logger);
*{$pkg . '::tracer'} = \($provider->tracer(
name => 'myriad',
version => $version,
));
} else {
*{$pkg . '::log'} = \Log::Any->get_logger(
category => $pkg
);
*{$pkg . '::tracer'} = \undef;
}
} else {
# Essentially the same as importing Log::Any qw($log) for now,
# but we may want to customise this with some additional attributes.
Expand Down
15 changes: 10 additions & 5 deletions lib/Myriad/Service/Implementation.pm
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ use Myriad::Util::UUID;

use Myriad::Service::Attributes;

use OpenTelemetry::Context;
use OpenTelemetry::Trace;
use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK );

# Only defer up to this many seconds between batch iterations
use constant MAX_EXPONENTIAL_BACKOFF => 2;

use constant USE_OPENTELEMETRY => 0;
use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY} || 0;

BEGIN {
if(USE_OPENTELEMETRY) {
require OpenTelemetry::Context;
require OpenTelemetry::Trace;
require OpenTelemetry::Constants;
OpenTelemetry::Constants->import(qw( SPAN_STATUS_ERROR SPAN_STATUS_OK ));
}
}

sub MODIFY_CODE_ATTRIBUTES {
my ($class, $code, @attrs) = @_;
Expand Down
91 changes: 58 additions & 33 deletions lib/Myriad/Subscription/Implementation/Memory.pm
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ use Myriad::Class ':v2', extends => qw(IO::Async::Notifier), does => [
'Myriad::Util::Defer'
];

use OpenTelemetry::Context;
use OpenTelemetry::Trace;
use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK );
use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY};

BEGIN {
if(USE_OPENTELEMETRY) {
require OpenTelemetry::Context;
require OpenTelemetry::Trace;
require OpenTelemetry::Constants;
OpenTelemetry::Constants->import(qw( SPAN_STATUS_ERROR SPAN_STATUS_OK ));
}
}

# VERSION
# AUTHORITY
Expand Down Expand Up @@ -96,37 +103,55 @@ async method start {
$subscription->{group_name},
'consumer'
);
for my $event_id (sort keys $messages->%*) {
my $span = $tracer->create_span(
parent => OpenTelemetry::Context->current,
name => $subscription->{channel},
attributes => {
group => $subscription->{group_name},
},
);
try {
my $context = OpenTelemetry::Trace->context_with_span($span);
dynamically OpenTelemetry::Context->current = $context;

$subscription->{sink}->emit($messages->{$event_id});
await $transport->ack_message(
$subscription->{channel},
$subscription->{group_name},
$event_id
);

$span->set_status(
SPAN_STATUS_OK
);
} catch ($e) {
$e = Myriad::Exception::InternalError->new(
reason => $e
) unless blessed($e) and $e->DOES('Myriad::Exception');
$log->errorf('Failed to process event %s - %s', $event_id, $e);
$span->record_exception($e);
$span->set_status(
SPAN_STATUS_ERROR, $e
if(USE_OPENTELEMETRY) {
for my $event_id (sort keys $messages->%*) {
my $span = $tracer->create_span(
parent => OpenTelemetry::Context->current,
name => $subscription->{channel},
attributes => {
group => $subscription->{group_name},
},
);
try {
my $context = OpenTelemetry::Trace->context_with_span($span);
dynamically OpenTelemetry::Context->current = $context;

$subscription->{sink}->emit($messages->{$event_id});
await $transport->ack_message(
$subscription->{channel},
$subscription->{group_name},
$event_id
);

$span->set_status(
SPAN_STATUS_OK
);
} catch ($e) {
$e = Myriad::Exception::InternalError->new(
reason => $e
) unless blessed($e) and $e->DOES('Myriad::Exception');
$log->errorf('Failed to process event %s - %s', $event_id, $e);
$span->record_exception($e);
$span->set_status(
SPAN_STATUS_ERROR, $e
);
}
}
} else {
for my $event_id (sort keys $messages->%*) {
try {
$subscription->{sink}->emit($messages->{$event_id});
await $transport->ack_message(
$subscription->{channel},
$subscription->{group_name},
$event_id
);
} catch ($e) {
$e = Myriad::Exception::InternalError->new(
reason => $e
) unless blessed($e) and $e->DOES('Myriad::Exception');
$log->errorf('Failed to process event %s - %s', $event_id, $e);
}
}
}

Expand Down
14 changes: 10 additions & 4 deletions lib/Myriad/Subscription/Implementation/Redis.pm
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ use Myriad::Class ':v2', extends => qw(IO::Async::Notifier), does => [

use Myriad::Util::UUID;
use Compress::Zstd ();
use OpenTelemetry::Context;
use OpenTelemetry::Trace;
use OpenTelemetry::Constants qw( SPAN_STATUS_ERROR SPAN_STATUS_OK );

use constant MAX_ALLOWED_STREAM_LENGTH => 10_000;

use constant USE_OPENTELEMETRY => 0;
use constant USE_OPENTELEMETRY => $ENV{USE_OPENTELEMETRY};

BEGIN {
if(USE_OPENTELEMETRY) {
require OpenTelemetry::Context;
require OpenTelemetry::Trace;
require OpenTelemetry::Constants;
OpenTelemetry::Constants->import(qw( SPAN_STATUS_ERROR SPAN_STATUS_OK ));
}
}

field $redis;

Expand Down

0 comments on commit 213e18a

Please sign in to comment.