Skip to content

Commit

Permalink
feat: metrics integration for sidekiq
Browse files Browse the repository at this point in the history
  • Loading branch information
zvkemp committed Jan 24, 2025
1 parent 5f393f8 commit 8e4e07f
Show file tree
Hide file tree
Showing 12 changed files with 408 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
#
# SPDX-License-Identifier: Apache-2.0

begin
require 'opentelemetry-metrics-api'
rescue LoadError
end

module OpenTelemetry
module Instrumentation
# Extensions to Instrumentation::Base that handle metrics instruments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Gem::Specification.new do |spec|

spec.add_dependency 'opentelemetry-api', '~> 1.0'
spec.add_dependency 'opentelemetry-instrumentation-base', '~> 0.23.0'
spec.add_dependency 'opentelemetry-metrics-api', '~> 1.0'

spec.add_development_dependency 'appraisal', '~> 2.5'
spec.add_development_dependency 'bundler', '~> 2.4'
Expand Down
53 changes: 34 additions & 19 deletions instrumentation/sidekiq/Appraisals
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
# frozen_string_literal: true

appraise 'sidekiq-7.0' do
gem 'sidekiq', '~> 7.0'
end

appraise 'sidekiq-6.5' do
gem 'sidekiq', '>= 6.5', '< 7.0'
end
{
'sidekiq-7.0' => [['sidekiq', '~> 7.0']],
'sidekiq-6.5' => [['sidekiq', '>= 6.5', '< 7.0']],
'sidekiq-6.0' => [
['sidekiq', '>= 6.0', '< 6.5'],
['redis', '< 4.8']
],
'sidekiq-5.2' => [
['sidekiq', '~> 5.2'],
['redis', '< 4.8']
],
'sidekiq-4.2' => [
['sidekiq', '~> 4.2'],
['redis', '< 4.8']
]
}.each do |gemfile_name, specs|
appraise gemfile_name do
specs.each do |spec|
gem(*spec)
remove_gem 'opentelemetry-metrics-api'
remove_gem 'opentelemetry-metrics-sdk'
end
end

appraise 'sidekiq-6.0' do
gem 'sidekiq', '>= 6.0', '< 6.5'
gem 'redis', '< 4.8'
end

appraise 'sidekiq-5.2' do
gem 'sidekiq', '~> 5.2'
gem 'redis', '< 4.8'
end
appraise "#{gemfile_name}-metrics-api" do
specs.each do |spec|
gem(*spec)
remove_gem 'opentelemetry-metrics-sdk'
end
end

appraise 'sidekiq-4.2' do
gem 'sidekiq', '~> 4.2'
gem 'redis', '< 4.8'
appraise "#{gemfile_name}-metrics-sdk" do
specs.each do |spec|
gem(*spec)
end
end
end
5 changes: 4 additions & 1 deletion instrumentation/sidekiq/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

source 'https://rubygems.org'

gem 'opentelemetry-metrics-api', '~> 0.2.0'
gem 'pry-byebug'

gemspec

group :test do
gem 'opentelemetry-instrumentation-base', path: '../base'
gem 'opentelemetry-instrumentation-redis', path: '../redis'
gem 'pry-byebug'
gem 'opentelemetry-metrics-sdk'
end
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
option :trace_poller_wait, default: false, validate: :boolean
option :trace_processor_process_one, default: false, validate: :boolean
option :peer_service, default: nil, validate: :string
option :metrics, default: false, validate: :boolean

counter 'messaging.client.sent.messages'
histogram 'messaging.client.operation.duration', unit: 's'
counter 'messaging.client.consumed.messages'
histogram 'messaging.process.duration', unit: 's'

# TODO: https://github.com/open-telemetry/semantic-conventions/pull/1812
gauge 'messaging.queue.latency', unit: 's'

private

Expand All @@ -115,6 +124,7 @@ def gem_version
end

def require_dependencies
require_relative 'middlewares/common'
require_relative 'middlewares/client/tracer_middleware'
require_relative 'middlewares/server/tracer_middleware'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# SPDX-License-Identifier: Apache-2.0

require_relative '../common'

module OpenTelemetry
module Instrumentation
module Sidekiq
Expand All @@ -12,6 +14,7 @@ module Client
# TracerMiddleware propagates context and instruments Sidekiq client
# by way of its middleware system
class TracerMiddleware
include Common
include ::Sidekiq::ClientMiddleware if defined?(::Sidekiq::ClientMiddleware)

def call(_worker_class, job, _queue, _redis_pool)
Expand All @@ -33,17 +36,49 @@ def call(_worker_class, job, _queue, _redis_pool)
OpenTelemetry.propagation.inject(job)
span.add_event('created_at', timestamp: job['created_at'])
yield
end.tap do # rubocop: disable Style/MultilineBlockChain
count_sent_message(job)
end
end

private

def instrumentation_config
Sidekiq::Instrumentation.instance.config
def count_sent_message(job)
with_meter do |_meter|
counter_attributes = metrics_attributes(job).merge(
{
'messaging.operation.name' => 'create'
# server.address => # FIXME: required if available
# messaging.destination.partition.id => FIXME: recommended
# server.port => # FIXME: recommended
}
)

counter = messaging_client_sent_messages_counter
counter.add(1, attributes: counter_attributes)
end
end

def messaging_client_sent_messages_counter
instrumentation.counter('messaging.client.sent.messages')
end

def tracer
Sidekiq::Instrumentation.instance.tracer
instrumentation.tracer
end

def with_meter(&block)
instrumentation.with_meter(&block)
end

def metrics_attributes(job)
{
'messaging.system' => 'sidekiq', # FIXME: metrics semconv
'messaging.destination.name' => job['queue'] # FIXME: metrics semconv
# server.address => # FIXME: required if available
# messaging.destination.partition.id => FIXME: recommended
# server.port => # FIXME: recommended
}
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Instrumentation
module Sidekiq
module Middlewares
# Common logic for server and client middlewares
module Common
private

def instrumentation
Sidekiq::Instrumentation.instance
end

def instrumentation_config
Sidekiq::Instrumentation.instance.config
end

# Bypasses _all_ enclosed logic unless metrics are enabled
def with_meter(&block)
instrumentation.with_meter(&block)
end

# time an inner block and yield the duration to the given callback
def timed(callback)
return yield unless metrics_enabled?

t0 = monotonic_now

yield.tap do
callback.call(monotonic_now - t0)
end
end

def realtime_now
Process.clock_gettime(Process::CLOCK_REALTIME)
end

def monotonic_now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def tracer
instrumentation.tracer
end

def metrics_enabled?
instrumentation.metrics_enabled?
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# SPDX-License-Identifier: Apache-2.0

require_relative '../common'

module OpenTelemetry
module Instrumentation
module Sidekiq
Expand All @@ -12,6 +14,7 @@ module Server
# TracerMiddleware propagates context and instruments Sidekiq requests
# by way of its middleware system
class TracerMiddleware
include Common
include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware)

def call(_worker, msg, _queue)
Expand All @@ -32,40 +35,91 @@ def call(_worker, msg, _queue)

extracted_context = OpenTelemetry.propagation.extract(msg)
OpenTelemetry::Context.with_current(extracted_context) do
if instrumentation_config[:propagation_style] == :child
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
end
else
links = []
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
OpenTelemetry::Trace.with_span(span) do
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
rescue Exception => e # rubocop:disable Lint/RescueException
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
raise e
ensure
span.finish
track_queue_latency(msg)

timed(track_process_time_callback(msg)) do
if instrumentation_config[:propagation_style] == :child
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
end
else
links = []
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
OpenTelemetry::Trace.with_span(span) do
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
rescue Exception => e # rubocop:disable Lint/RescueException
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
raise e
ensure
span.finish
end
end
end

count_consumed_message(msg)
end
end

private

def instrumentation_config
Sidekiq::Instrumentation.instance.config
def track_queue_latency(msg)
with_meter do
return unless (enqueued_at = msg['enqueued_at'])
return unless enqueued_at.is_a?(Numeric)

latency = (realtime_now - enqueued_at).abs

queue_latency_gauge&.record(latency, attributes: metrics_attributes(msg))
end
end

def track_process_time_callback(msg)
->(duration) { track_process_time(msg, duration) }
end

def track_process_time(msg, duration)
with_meter do
attributes = metrics_attributes(msg).merge(
{ 'messaging.operation.name' => 'process' }
)
messaging_process_duration_histogram&.record(duration, attributes: attributes)
end
end

def messaging_process_duration_histogram
instrumentation.histogram('messaging.process.duration')
end

def count_consumed_message(msg)
with_meter do
messaging_client_consumed_messages_counter.add(1, attributes: metrics_attributes(msg))
end
end

def tracer
Sidekiq::Instrumentation.instance.tracer
def messaging_client_consumed_messages_counter
instrumentation.counter('messaging.client.consumed.messages')
end

def queue_latency_gauge
instrumentation.gauge('messaging.queue.latency')
end

# FIXME: dedupe
def metrics_attributes(msg)
{
'messaging.system' => 'sidekiq', # FIXME: metrics semconv
'messaging.destination.name' => msg['queue'] # FIXME: metrics semconv
# server.address => # FIXME: required if available
# messaging.destination.partition.id => FIXME: recommended
# server.port => # FIXME: recommended
}
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'appraisal', '~> 2.5'
spec.add_development_dependency 'bundler', '~> 2.4'
spec.add_development_dependency 'minitest', '~> 5.0'
spec.add_development_dependency 'minitest-reporters'
spec.add_development_dependency 'opentelemetry-sdk', '~> 1.1'
spec.add_development_dependency 'opentelemetry-test-helpers', '~> 0.3'
spec.add_development_dependency 'rspec-mocks'
Expand Down
Loading

0 comments on commit 8e4e07f

Please sign in to comment.