diff --git a/Rakefile b/Rakefile index 4bf70667735..f9ccd0084da 100644 --- a/Rakefile +++ b/Rakefile @@ -27,6 +27,14 @@ CORE_WITH_LIBDATADOG_API = [ 'spec/datadog/core/process_discovery_spec.rb', 'spec/datadog/core/configuration/stable_config_spec.rb', 'spec/datadog/core/ddsketch_spec.rb', + 'spec/datadog/data_streams/**/*_spec.rb', +].freeze + +# Data Streams Monitoring (DSM) requires libdatadog_api for DDSketch +# Add new instrumentation libraries here as they gain DSM support +DSM_ENABLED_LIBRARIES = [ + :kafka, + :karafka ].freeze # rubocop:disable Metrics/BlockLength @@ -82,8 +90,8 @@ namespace :spec do desc '' # "Explicitly hiding from `rake -T`" RSpec::Core::RakeTask.new(:main) do |t, args| t.pattern = 'spec/**/*_spec.rb' - t.exclude_pattern = 'spec/**/{appsec/integration,contrib,benchmark,redis,auto_instrument,opentelemetry,profiling,crashtracking,error_tracking,rubocop}/**/*_spec.rb,' \ - ' spec/**/{auto_instrument,opentelemetry,process_discovery,stable_config,ddsketch}_spec.rb,' \ + t.exclude_pattern = 'spec/**/{appsec/integration,contrib,benchmark,redis,auto_instrument,opentelemetry,profiling,crashtracking,error_tracking,rubocop,data_streams}/**/*_spec.rb,' \ + ' spec/**/{auto_instrument,opentelemetry,process_discovery,stable_config,ddsketch}*_spec.rb,' \ ' spec/datadog/gem_packaging_spec.rb' t.rspec_opts = args.to_a.join(' ') end @@ -295,6 +303,22 @@ namespace :spec do end end + # Ensure DSM-enabled contrib tests compile libdatadog_api before running (MRI Ruby only) + # If compilation fails (e.g., new Ruby version without prebuilt extension), tests will skip via DDSketch.supported? + unless RUBY_PLATFORM == 'java' + task :compile_libdatadog_for_dsm do + Rake::Task["compile:libdatadog_api.#{RUBY_VERSION[/\d+.\d+/]}_#{RUBY_PLATFORM}"].invoke + rescue => e + # Compilation failed (likely unsupported Ruby version) - tests will skip gracefully + puts "Warning: libdatadog_api compilation failed: #{e.class}: #{e}" + puts "DSM tests will be skipped for this Ruby version" + end + + DSM_ENABLED_LIBRARIES.each do |task_name| + Rake::Task["spec:#{task_name}"].enhance([:compile_libdatadog_for_dsm]) + end + end + namespace :appsec do task all: [ :main, diff --git a/Steepfile b/Steepfile index 3a47489f30a..1e46397751f 100644 --- a/Steepfile +++ b/Steepfile @@ -137,6 +137,7 @@ target :datadog do ignore 'lib/datadog/core/utils/time.rb' ignore 'lib/datadog/core/vendor/multipart-post/multipart/post/multipartable.rb' ignore 'lib/datadog/core/worker.rb' + ignore 'lib/datadog/data_streams/configuration/settings.rb' ignore 'lib/datadog/core/workers/async.rb' ignore 'lib/datadog/core/workers/interval_loop.rb' ignore 'lib/datadog/core/workers/polling.rb' diff --git a/lib/datadog.rb b/lib/datadog.rb index f09d63a60e5..224756cb4df 100644 --- a/lib/datadog.rb +++ b/lib/datadog.rb @@ -8,6 +8,7 @@ require_relative 'datadog/profiling' require_relative 'datadog/appsec' require_relative 'datadog/di' +require_relative 'datadog/data_streams' # Line probes will not work on Ruby < 2.6 because of lack of :script_compiled # trace point. Activate DI automatically on supported Ruby versions but diff --git a/lib/datadog/core/configuration/components.rb b/lib/datadog/core/configuration/components.rb index 09fccc0879d..0ea3a1dabe2 100644 --- a/lib/datadog/core/configuration/components.rb +++ b/lib/datadog/core/configuration/components.rb @@ -19,6 +19,7 @@ require_relative '../crashtracking/component' require_relative '../environment/agent_info' require_relative '../process_discovery' +require_relative '../../data_streams/processor' module Datadog module Core @@ -75,6 +76,20 @@ def build_crashtracker(settings, agent_settings, logger:) Datadog::Core::Crashtracking::Component.build(settings, agent_settings, logger: logger) end + + def build_data_streams(settings, agent_settings, logger) + return unless settings.data_streams.enabled + + Datadog::DataStreams::Processor.new( + interval: settings.data_streams.interval, + logger: logger, + settings: settings, + agent_settings: agent_settings + ) + rescue => e + logger.warn("Failed to initialize Data Streams Monitoring: #{e.class}: #{e}") + nil + end end attr_reader \ @@ -90,7 +105,8 @@ def build_crashtracker(settings, agent_settings, logger:) :error_tracking, :dynamic_instrumentation, :appsec, - :agent_info + :agent_info, + :data_streams def initialize(settings) @settings = settings @@ -126,6 +142,7 @@ def initialize(settings) @appsec = Datadog::AppSec::Component.build_appsec_component(settings, telemetry: telemetry) @dynamic_instrumentation = Datadog::DI::Component.build(settings, agent_settings, @logger, telemetry: telemetry) @error_tracking = Datadog::ErrorTracking::Component.build(settings, @tracer, @logger) + @data_streams = self.class.build_data_streams(settings, agent_settings, @logger) @environment_logger_extra[:dynamic_instrumentation_enabled] = !!@dynamic_instrumentation # Configure non-privileged components. @@ -195,6 +212,9 @@ def shutdown!(replacement = nil) # Shutdown workers runtime_metrics.stop(true, close_metrics: false) + # Shutdown Data Streams Monitoring processor + data_streams&.stop(true) + # Shutdown the old metrics, unless they are still being used. # (e.g. custom Statsd instances.) # diff --git a/lib/datadog/core/configuration/supported_configurations.rb b/lib/datadog/core/configuration/supported_configurations.rb index b6e26ccfab4..707787510d1 100644 --- a/lib/datadog/core/configuration/supported_configurations.rb +++ b/lib/datadog/core/configuration/supported_configurations.rb @@ -33,6 +33,7 @@ module Configuration "DD_APPSEC_WAF_DEBUG" => {version: ["A"]}, "DD_APPSEC_WAF_TIMEOUT" => {version: ["A"]}, "DD_CRASHTRACKING_ENABLED" => {version: ["A"]}, + "DD_DATA_STREAMS_ENABLED" => {version: ["A"]}, "DD_DBM_PROPAGATION_MODE" => {version: ["A"]}, "DD_DISABLE_DATADOG_RAILS" => {version: ["A"]}, "DD_DYNAMIC_INSTRUMENTATION_ENABLED" => {version: ["A"]}, diff --git a/lib/datadog/core/ddsketch.rb b/lib/datadog/core/ddsketch.rb index 51de65b1983..834d2b61b85 100644 --- a/lib/datadog/core/ddsketch.rb +++ b/lib/datadog/core/ddsketch.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require 'datadog/core' - module Datadog module Core # Used to access ddsketch APIs. diff --git a/lib/datadog/data_streams.rb b/lib/datadog/data_streams.rb new file mode 100644 index 00000000000..abd9187cd89 --- /dev/null +++ b/lib/datadog/data_streams.rb @@ -0,0 +1,100 @@ +# frozen_string_literal: true + +require_relative 'data_streams/processor' +require_relative 'data_streams/pathway_context' +require_relative 'data_streams/configuration/settings' +require_relative 'data_streams/extensions' +require_relative 'core/utils/time' + +module Datadog + # Datadog Data Streams Monitoring public API. + # + # The Datadog team ensures that public methods in this module + # only receive backwards compatible changes, and breaking changes + # will only occur in new major versions releases. + # @public_api + module DataStreams + class << self + # Set a produce checkpoint for Data Streams Monitoring + # + # @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns') + # @param destination [String] The destination (e.g., topic, exchange, stream name) + # @param auto_instrumentation [Boolean] Whether this checkpoint was set by auto-instrumentation (default: false) + # @param tags [Hash] Additional tags to include + # @yield [key, value] Block to inject context into carrier + # @return [String, nil] Base64 encoded pathway context or nil if disabled + # @public_api + def set_produce_checkpoint(type:, destination:, auto_instrumentation: false, tags: {}, &block) + processor&.set_produce_checkpoint( + type: type, + destination: destination, + manual_checkpoint: !auto_instrumentation, + tags: tags, + &block + ) + end + + # Set a consume checkpoint for Data Streams Monitoring + # + # @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns') + # @param source [String] The source (e.g., topic, exchange, stream name) + # @param auto_instrumentation [Boolean] Whether this checkpoint was set by auto-instrumentation (default: false) + # @param tags [Hash] Additional tags to include + # @yield [key] Block to extract context from carrier + # @return [String, nil] Base64 encoded pathway context or nil if disabled + # @public_api + def set_consume_checkpoint(type:, source:, auto_instrumentation: false, tags: {}, &block) + processor&.set_consume_checkpoint( + type: type, + source: source, + manual_checkpoint: !auto_instrumentation, + tags: tags, + &block + ) + end + + # Track Kafka produce offset for lag monitoring + # + # @param topic [String] The Kafka topic name + # @param partition [Integer] The partition number + # @param offset [Integer] The offset of the produced message + # @return [Boolean, nil] true if tracking succeeded, nil if disabled + # @!visibility private + def track_kafka_produce(topic, partition, offset) + processor&.track_kafka_produce(topic, partition, offset, Core::Utils::Time.now) + end + + # Track Kafka message consumption for consumer lag monitoring + # + # @param topic [String] The Kafka topic name + # @param partition [Integer] The partition number + # @param offset [Integer] The offset of the consumed message + # @return [Boolean, nil] true if tracking succeeded, nil if disabled + # @!visibility private + def track_kafka_consume(topic, partition, offset) + processor&.track_kafka_consume(topic, partition, offset, Core::Utils::Time.now) + end + + # Check if Data Streams Monitoring is enabled and available + # + # @return [Boolean] true if the processor is available + # @public_api + def enabled? + !processor.nil? + end + + private + + def processor + components.data_streams + end + + def components + Datadog.send(:components) + end + end + + # Expose Data Streams to global shared objects + Extensions.activate! + end +end diff --git a/lib/datadog/data_streams/configuration.rb b/lib/datadog/data_streams/configuration.rb new file mode 100644 index 00000000000..4d70270eff9 --- /dev/null +++ b/lib/datadog/data_streams/configuration.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require_relative 'configuration/settings' + +module Datadog + module DataStreams + # Configuration for Data Streams Monitoring + module Configuration + end + end +end diff --git a/lib/datadog/data_streams/configuration/settings.rb b/lib/datadog/data_streams/configuration/settings.rb new file mode 100644 index 00000000000..29bf7d7bc83 --- /dev/null +++ b/lib/datadog/data_streams/configuration/settings.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +require_relative '../../core/environment/variable_helpers' +require_relative '../ext' + +module Datadog + module DataStreams + module Configuration + # Configuration settings for Data Streams Monitoring. + module Settings + def self.extended(base) + base = base.singleton_class unless base.is_a?(Class) + add_settings!(base) + end + + def self.add_settings!(base) + base.class_eval do + # Data Streams Monitoring configuration + # @public_api + settings :data_streams do + # Whether Data Streams Monitoring is enabled. When enabled, the library will + # collect and report data lineage information for messaging systems. + # + # @default `DD_DATA_STREAMS_ENABLED` environment variable, otherwise `false`. + # @return [Boolean] + option :enabled do |o| + o.type :bool + o.env Ext::ENV_ENABLED + o.default false + end + + # The interval (in seconds) at which Data Streams Monitoring stats are flushed. + # + # @default 10.0 + # @env '_DD_TRACE_STATS_WRITER_INTERVAL' + # @return [Float] + # @!visibility private + option :interval do |o| + o.type :float + o.env '_DD_TRACE_STATS_WRITER_INTERVAL' + o.default 10.0 + end + end + end + end + end + end + end +end diff --git a/lib/datadog/data_streams/ext.rb b/lib/datadog/data_streams/ext.rb new file mode 100644 index 00000000000..61263888434 --- /dev/null +++ b/lib/datadog/data_streams/ext.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module Datadog + module DataStreams + # Constants for Data Streams Monitoring configuration + # @public_api + module Ext + ENV_ENABLED = 'DD_DATA_STREAMS_ENABLED' + end + end +end diff --git a/lib/datadog/data_streams/extensions.rb b/lib/datadog/data_streams/extensions.rb new file mode 100644 index 00000000000..f6d48138460 --- /dev/null +++ b/lib/datadog/data_streams/extensions.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +require_relative 'configuration' +require_relative '../core/configuration' + +module Datadog + module DataStreams + # Extends Datadog with Data Streams Monitoring features + module Extensions + # Inject Data Streams settings into global configuration. + def self.activate! + Core::Configuration::Settings.extend(Configuration::Settings) + end + end + end +end diff --git a/lib/datadog/data_streams/pathway_context.rb b/lib/datadog/data_streams/pathway_context.rb new file mode 100644 index 00000000000..35e19034879 --- /dev/null +++ b/lib/datadog/data_streams/pathway_context.rb @@ -0,0 +1,169 @@ +# frozen_string_literal: true + +require 'stringio' +require_relative '../core/utils/base64' + +module Datadog + module DataStreams + # Represents a pathway context for data streams monitoring + class PathwayContext + # The current pathway hash value (result of FNV-1a hash function) + attr_accessor :hash + # When the pathway started + attr_accessor :pathway_start + # When the current edge started + attr_accessor :current_edge_start + # The hash value of the parent checkpoint + attr_accessor :parent_hash + # The direction tag of the previous checkpoint (e.g., 'direction:in', 'direction:out'), or nil if none + attr_accessor :previous_direction + # Hash value of the closest checkpoint in opposite direction (used for loop detection) + attr_accessor :closest_opposite_direction_hash + # Edge start time of the closest opposite direction checkpoint + attr_accessor :closest_opposite_direction_edge_start + + def initialize(hash_value:, pathway_start:, current_edge_start:) + @hash = hash_value + @pathway_start = pathway_start + @current_edge_start = current_edge_start + @parent_hash = nil + + @previous_direction = nil + @closest_opposite_direction_hash = 0 + @closest_opposite_direction_edge_start = current_edge_start + end + + def encode_b64 + Core::Utils::Base64.strict_encode64(encode) + end + + # Decode pathway context from base64 encoded string + def self.decode_b64(encoded_ctx) + return nil unless encoded_ctx && !encoded_ctx.empty? + + begin + binary_data = Core::Utils::Base64.strict_decode64(encoded_ctx) + decode(binary_data) + rescue ArgumentError => e + # Invalid base64 encoding - may indicate version mismatch or corruption + Datadog.logger.debug("Failed to decode DSM pathway context: #{e.message}") + nil + end + end + + private + + def encode + # Format: + # - 8 bytes: hash value (little-endian) + # - VarInt: pathway start time (milliseconds) + # - VarInt: current edge start time (milliseconds) + [@hash].pack('Q') << + encode_var_int_64(time_to_ms(@pathway_start)) << + encode_var_int_64(time_to_ms(@current_edge_start)) + end + + # Decode pathway context from binary data + def self.decode(binary_data) + return nil unless binary_data && binary_data.bytesize >= 8 + + reader = StringIO.new(binary_data) + + # Extract 8-byte hash (little-endian) + hash_bytes = reader.read(8) + return nil unless hash_bytes + + hash_value = hash_bytes.unpack1('Q') # : Integer + + # Extract pathway start time (VarInt milliseconds) + pathway_start_ms = decode_varint(reader) + return nil unless pathway_start_ms + + # Extract current edge start time (VarInt milliseconds) + current_edge_start_ms = decode_varint(reader) + return nil unless current_edge_start_ms + + # Convert milliseconds to Time objects + pathway_start = ms_to_time(pathway_start_ms) + current_edge_start = ms_to_time(current_edge_start_ms) + + new( + hash_value: hash_value, + pathway_start: pathway_start, + current_edge_start: current_edge_start + ) + rescue EOFError + # Not enough data in binary stream + nil + end + private_class_method :decode + + # Encode an unsigned 64-bit integer using LEB128 variable-length encoding. + # + # This implements unsigned LEB128 (Little Endian Base 128) encoding as specified + # in DWARF5 standard section 7.6: + # https://dwarfstd.org/doc/DWARF5.pdf#page=301 + # + # Each byte uses 7 bits for data and 1 bit to indicate continuation. + # The high bit is set if more bytes follow, clear for the final byte. + # + # @param value [Integer] Unsigned integer value to encode + # @return [String] Binary string of encoded bytes + def encode_var_int_64(value) + bytes = [] + while value >= 0x80 + bytes << ((value & 0x7F) | 0x80) + value >>= 7 + end + bytes << value + bytes.pack('C*') + end + + # Decode an unsigned LEB128 variable-length integer from IO stream. + # + # This implements unsigned LEB128 (Little Endian Base 128) decoding as specified + # in DWARF5 standard section 7.6: + # https://dwarfstd.org/doc/DWARF5.pdf#page=301 + # + # VarInt format: Each byte uses 7 bits for data, 1 bit for continuation + # - High bit set = more bytes follow + # - High bit clear = final byte + # - Data bits accumulated in little-endian order + # + # @param io [StringIO] IO stream to read from + # @return [Integer, nil] Decoded unsigned integer, or nil if malformed + def self.decode_varint(io) + value = 0 + shift = 0 + + loop do + byte = io.readbyte + + # Add this byte's 7 data bits to our value + value |= (byte & 0x7F) << shift + + # If high bit is clear, we're done + return value unless (byte & 0x80).nonzero? + + shift += 7 + + # Safety: prevent infinite decoding + return nil if shift >= 64 + end + rescue EOFError + # Stream ended unexpectedly - malformed data + nil + end + private_class_method :decode_varint + + def self.ms_to_time(milliseconds) + ::Time.at(milliseconds / 1000.0) + end + private_class_method :ms_to_time + + def time_to_ms(time) + (time.to_f * 1000).to_i + end + end + end +end diff --git a/lib/datadog/data_streams/processor.rb b/lib/datadog/data_streams/processor.rb new file mode 100644 index 00000000000..c5f1090d675 --- /dev/null +++ b/lib/datadog/data_streams/processor.rb @@ -0,0 +1,509 @@ +# frozen_string_literal: true + +require 'zlib' +require_relative 'pathway_context' +require_relative 'transport/http' +require_relative '../version' +require_relative '../core/worker' +require_relative '../core/workers/polling' +require_relative '../core/ddsketch' +require_relative '../core/buffer/cruby' +require_relative '../core/utils/time' + +module Datadog + module DataStreams + # Raised when Data Streams Monitoring cannot be initialized due to missing dependencies + class UnsupportedError < StandardError; end + + # Processor for Data Streams Monitoring + # This class is responsible for collecting and reporting pathway stats + # Periodically (every interval, 10 seconds by default) flushes stats to the Datadog agent. + class Processor < Core::Worker + include Core::Workers::Polling + + PROPAGATION_KEY = 'dd-pathway-ctx-base64' + + # Default buffer size for lock-free event queue + # Set to handle high-throughput scenarios (e.g., 10k events/sec for 10s interval) + DEFAULT_BUFFER_SIZE = 100_000 + + attr_reader :pathway_context, :buckets, :bucket_size_ns + + # Initialize the Data Streams Monitoring processor + # + # @param interval [Float] Flush interval in seconds (e.g., 10.0 for 10 seconds) + # @param logger [Datadog::Core::Logger] Logger instance for debugging + # @param settings [Datadog::Core::Configuration::Settings] Global configuration settings + # @param agent_settings [Datadog::Core::Configuration::AgentSettings] Agent connection settings + # @param buffer_size [Integer] Size of the lock-free event buffer for async stat collection + # (default: DEFAULT_BUFFER_SIZE). Higher values support more throughput but use more memory. + # @raise [UnsupportedError] if DDSketch is not available on this platform + def initialize(interval:, logger:, settings:, agent_settings:, buffer_size: DEFAULT_BUFFER_SIZE) + raise UnsupportedError, 'DDSketch is not supported' unless Datadog::Core::DDSketch.supported? + + @settings = settings + @agent_settings = agent_settings + @logger = logger + + now = Core::Utils::Time.now + @pathway_context = PathwayContext.new( + hash_value: 0, + pathway_start: now, + current_edge_start: now + ) + @bucket_size_ns = (interval * 1e9).to_i + @buckets = {} + @consumer_stats = [] + @stats_mutex = Mutex.new + @event_buffer = Core::Buffer::CRuby.new(buffer_size) + + super() + self.loop_base_interval = interval + + perform + end + + # Track Kafka produce offset for lag monitoring + # @param topic [String] The Kafka topic name + # @param partition [Integer] The partition number + # @param offset [Integer] The offset of the produced message + # @param now [Time] Timestamp + # @return [Boolean] true if tracking succeeded + def track_kafka_produce(topic, partition, offset, now) + @event_buffer.push( + { + type: :kafka_produce, + topic: topic, + partition: partition, + offset: offset, + timestamp_ns: (now.to_f * 1e9).to_i + } + ) + true + end + + # Track Kafka message consumption for consumer lag monitoring + # @param topic [String] The Kafka topic name + # @param partition [Integer] The partition number + # @param offset [Integer] The offset of the consumed message + # @param now [Time] Timestamp + # @return [Boolean] true if tracking succeeded + def track_kafka_consume(topic, partition, offset, now) + @event_buffer.push( + { + type: :kafka_consume, + topic: topic, + partition: partition, + offset: offset, + timestamp: now + } + ) + true + end + + # Set a produce checkpoint + # @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns') + # @param destination [String] The destination (e.g., topic, exchange, stream name) + # @param manual_checkpoint [Boolean] Whether this checkpoint was manually set (default: true) + # @param tags [Hash] Additional tags to include + # @yield [key, value] Block to inject context into carrier + # @return [String] Base64 encoded pathway context + def set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}, &block) + checkpoint_tags = ["type:#{type}", "topic:#{destination}", 'direction:out'] + checkpoint_tags << 'manual_checkpoint:true' if manual_checkpoint + checkpoint_tags.concat(tags.map { |k, v| "#{k}:#{v}" }) unless tags.empty? + + span = Datadog::Tracing.active_span + pathway = set_checkpoint(tags: checkpoint_tags, span: span) + + yield(PROPAGATION_KEY, pathway) if pathway && block + + pathway + end + + # Set a consume checkpoint + # @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns') + # @param source [String] The source (e.g., topic, exchange, stream name) + # @param manual_checkpoint [Boolean] Whether this checkpoint was manually set (default: true) + # @param tags [Hash] Additional tags to include + # @yield [key] Block to extract context from carrier + # @return [String] Base64 encoded pathway context + def set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}, &block) + if block + pathway_ctx = yield(PROPAGATION_KEY) + if pathway_ctx + decoded_ctx = decode_pathway_b64(pathway_ctx) + set_pathway_context(decoded_ctx) + end + end + + checkpoint_tags = ["type:#{type}", "topic:#{source}", 'direction:in'] + checkpoint_tags << 'manual_checkpoint:true' if manual_checkpoint + checkpoint_tags.concat(tags.map { |k, v| "#{k}:#{v}" }) unless tags.empty? + + span = Datadog::Tracing.active_span + set_checkpoint(tags: checkpoint_tags, span: span) + end + + # Called periodically by the worker to flush stats to the agent + def perform + process_events + flush_stats + true + end + + private + + # Drain event buffer and apply updates to shared data structures + # This runs in the background worker thread, not the critical path + def process_events + events = @event_buffer.pop + return if events.empty? + + @stats_mutex.synchronize do + events.each do |event_obj| + # Buffer stores Objects; we know they're hashes with symbol keys + event = event_obj # : ::Hash[::Symbol, untyped] + case event[:type] + when :kafka_produce + process_kafka_produce_event(event) + when :kafka_consume + process_kafka_consume_event(event) + when :checkpoint + process_checkpoint_event(event) + end + end + end + end + + def process_kafka_produce_event(event) + partition_key = "#{event[:topic]}:#{event[:partition]}" + bucket_time_ns = event[:timestamp_ns] - (event[:timestamp_ns] % @bucket_size_ns) + bucket = @buckets[bucket_time_ns] ||= create_bucket + + bucket[:latest_produce_offsets][partition_key] = [ + event[:offset], + bucket[:latest_produce_offsets][partition_key] || 0 + ].max + end + + def process_kafka_consume_event(event) + @consumer_stats << { + topic: event[:topic], + partition: event[:partition], + offset: event[:offset], + timestamp: event[:timestamp], + timestamp_sec: event[:timestamp].to_f + } + + timestamp_ns = (event[:timestamp].to_f * 1e9).to_i + bucket_time_ns = timestamp_ns - (timestamp_ns % @bucket_size_ns) + @buckets[bucket_time_ns] ||= create_bucket + + # Track offset gaps for lag detection + partition_key = "#{event[:topic]}:#{event[:partition]}" + @latest_consumer_offsets ||= {} + previous_offset = @latest_consumer_offsets[partition_key] || 0 + + if event[:offset] > previous_offset + 1 + @consumer_lag_events ||= [] + @consumer_lag_events << { + topic: event[:topic], + partition: event[:partition], + expected_offset: previous_offset + 1, + actual_offset: event[:offset], + gap_size: event[:offset] - previous_offset - 1, + timestamp_sec: event[:timestamp].to_f + } + end + + @latest_consumer_offsets[partition_key] = [event[:offset], previous_offset].max + end + + def process_checkpoint_event(event) + now_ns = (event[:timestamp_sec] * 1e9).to_i + bucket_time_ns = now_ns - (now_ns % @bucket_size_ns) + bucket = @buckets[bucket_time_ns] ||= create_bucket + + aggr_key = [event[:tags].join(','), event[:hash], event[:parent_hash]] + stats = bucket[:pathway_stats][aggr_key] ||= create_pathway_stats + + stats[:edge_latency].add(event[:edge_latency_sec]) + stats[:full_pathway_latency].add(event[:full_pathway_latency_sec]) + end + + def encode_pathway_context + @pathway_context.encode_b64 + end + + def set_checkpoint(tags:, now: nil, payload_size: 0, span: nil) + now ||= Core::Utils::Time.now + + current_context = get_current_context + tags = tags.sort + + direction = nil + tags.each do |tag| + if tag.start_with?('direction:') + direction = tag + break + end + end + + # Loop detection: consecutive same-direction checkpoints reuse the opposite direction's hash + if direction && direction == current_context.previous_direction + current_context.hash = current_context.closest_opposite_direction_hash + if current_context.hash == 0 + current_context.current_edge_start = now + current_context.pathway_start = now + else + current_context.current_edge_start = current_context.closest_opposite_direction_edge_start + end + else + current_context.previous_direction = direction + current_context.closest_opposite_direction_hash = current_context.hash + current_context.closest_opposite_direction_edge_start = current_context.current_edge_start + end + + parent_hash = current_context.hash + new_hash = compute_pathway_hash(parent_hash, tags) + + # Tag the APM span with the pathway hash to link DSM and APM + span&.set_tag('pathway.hash', new_hash.to_s) + + edge_latency_sec = [now - current_context.current_edge_start, 0.0].max + full_pathway_latency_sec = [now - current_context.pathway_start, 0.0].max + + record_checkpoint_stats( + hash: new_hash, + parent_hash: parent_hash, + edge_latency_sec: edge_latency_sec, + full_pathway_latency_sec: full_pathway_latency_sec, + payload_size: payload_size, + tags: tags, + timestamp_sec: now.to_f + ) + + current_context.parent_hash = current_context.hash + current_context.hash = new_hash + current_context.current_edge_start = now + + current_context.encode_b64 + end + + def decode_pathway_context(encoded_ctx) + PathwayContext.decode_b64(encoded_ctx) + end + + def decode_pathway_b64(encoded_ctx) + PathwayContext.decode_b64(encoded_ctx) + end + + def flush_stats + payload = nil # : ::Hash[::String, untyped]? + + @stats_mutex.synchronize do + return if @buckets.empty? && @consumer_stats.empty? + + stats_buckets = serialize_buckets + + payload = { + 'Service' => @settings.service, + 'TracerVersion' => Datadog::VERSION::STRING, + 'Lang' => 'ruby', + 'Stats' => stats_buckets, + 'Hostname' => hostname + } + + # Clear consumer stats even if sending fails to prevent unbounded memory growth + # Must be done inside mutex before we release it + @consumer_stats.clear + end + + # Send to agent outside mutex to avoid blocking customer code if agent is slow/hung + send_stats_to_agent(payload) if payload + rescue => e + @logger.debug("Failed to flush DSM stats to agent: #{e.class}: #{e}") + end + + def get_current_pathway + get_current_context + end + + def get_current_context + @pathway_context ||= begin + now = Core::Utils::Time.now + PathwayContext.new( + hash_value: 0, + pathway_start: now, + current_edge_start: now + ) + end + end + + def set_pathway_context(ctx) + if ctx + @pathway_context = ctx + @pathway_context.previous_direction = nil + @pathway_context.closest_opposite_direction_hash = 0 + @pathway_context.closest_opposite_direction_edge_start = @pathway_context.current_edge_start + end + end + + def decode_and_set_pathway_context(headers) + return unless headers && headers['dd-pathway-ctx-base64'] + + pathway_ctx = decode_pathway_context(headers['dd-pathway-ctx-base64']) + set_pathway_context(pathway_ctx) if pathway_ctx + end + + # Compute new pathway hash using FNV-1a algorithm. + # Combines service, env, tags, and parent hash to create unique pathway identifier. + def compute_pathway_hash(current_hash, tags) + service = @settings.service || 'ruby-service' + env = @settings.env || 'none' + + bytes = service.bytes + env.bytes + tags.each { |tag| bytes += tag.bytes } + byte_string = bytes.pack('C*') + + node_hash = fnv1_64(byte_string) + combined_bytes = [node_hash, current_hash].pack('QQ') + fnv1_64(combined_bytes) + end + + # FNV-1a 64-bit hash function. + def fnv1_64(data) + fnv_offset_basis = 14695981039346656037 + fnv_prime = 1099511628211 + + hash_value = fnv_offset_basis + data.each_byte do |byte| + hash_value ^= byte + hash_value = (hash_value * fnv_prime) & 0xFFFFFFFFFFFFFFFF + end + hash_value + end + + def record_checkpoint_stats( + hash:, parent_hash:, edge_latency_sec:, full_pathway_latency_sec:, payload_size:, tags:, + timestamp_sec: + ) + @event_buffer.push( + { + type: :checkpoint, + hash: hash, + parent_hash: parent_hash, + edge_latency_sec: edge_latency_sec, + full_pathway_latency_sec: full_pathway_latency_sec, + payload_size: payload_size, + tags: tags, + timestamp_sec: timestamp_sec + } + ) + true + end + + def record_consumer_stats(topic:, partition:, offset:, timestamp:) + # Already handled by track_kafka_consume pushing to buffer + # This method kept for API compatibility but does nothing + end + + def send_stats_to_agent(payload) + response = transport.send_stats(payload) + @logger.debug("DSM stats sent to agent: ok=#{response.ok?}") + end + + def transport + @transport ||= Transport::HTTP.default( + agent_settings: @agent_settings, + logger: @logger + ) + end + + def serialize_buckets + serialized_buckets = [] + bucket_keys_to_clear = [] + + @buckets.each do |bucket_time_ns, bucket| + bucket_keys_to_clear << bucket_time_ns + + bucket_stats = [] + bucket[:pathway_stats].each do |aggr_key, stats| + edge_tags_str, hash_value, parent_hash = aggr_key + edge_tags_array = edge_tags_str.split(',') + + bucket_stats << { + 'EdgeTags' => edge_tags_array, + 'Hash' => hash_value, + 'ParentHash' => parent_hash, + 'PathwayLatency' => stats[:full_pathway_latency].encode, + 'EdgeLatency' => stats[:edge_latency].encode, + } + end + + backlogs = [] + bucket[:latest_produce_offsets].each do |key, offset| + topic, partition = key.split(':', 2) + backlogs << { + 'Tags' => ['type:kafka_produce', "topic:#{topic}", "partition:#{partition}"], + 'Value' => offset + } + end + bucket[:latest_commit_offsets].each do |key, offset| + group, topic, partition = key.split(':', 3) + backlogs << { + 'Tags' => ['type:kafka_commit', "consumer_group:#{group}", "topic:#{topic}", "partition:#{partition}"], + 'Value' => offset + } + end + + serialized_buckets << { + 'Start' => bucket_time_ns, + 'Duration' => @bucket_size_ns, + 'Stats' => bucket_stats, + 'Backlogs' => backlogs + serialize_consumer_backlogs + } + end + + bucket_keys_to_clear.each { |key| @buckets.delete(key) } + + serialized_buckets + end + + def serialize_consumer_backlogs + @consumer_stats.map do |stat| + { + 'Tags' => [ + 'type:kafka_consume', + "topic:#{stat[:topic]}", + "partition:#{stat[:partition]}" + ], + 'Value' => stat[:offset] + } + end + end + + def hostname + Core::Environment::Socket.hostname + end + + def create_bucket + { + pathway_stats: {}, + latest_produce_offsets: {}, + latest_commit_offsets: {} + } + end + + def create_pathway_stats + { + edge_latency: Datadog::Core::DDSketch.new, + full_pathway_latency: Datadog::Core::DDSketch.new, + payload_size_sum: 0, + payload_size_count: 0 + } + end + end + end +end diff --git a/lib/datadog/data_streams/transport/http.rb b/lib/datadog/data_streams/transport/http.rb new file mode 100644 index 00000000000..26eacc14fd9 --- /dev/null +++ b/lib/datadog/data_streams/transport/http.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +require_relative '../../core/transport/http' +require_relative 'http/api' +require_relative 'http/client' +require_relative 'http/stats' +require_relative 'stats' + +module Datadog + module DataStreams + module Transport + # HTTP transport for Data Streams Monitoring + module HTTP + module_function + + # Builds a new Transport::HTTP::Client with default settings + def default( + agent_settings:, + logger: + ) + Core::Transport::HTTP.build( + api_instance_class: Stats::API::Instance, + agent_settings: agent_settings, + logger: logger, + headers: { + 'Content-Type' => 'application/msgpack', + 'Content-Encoding' => 'gzip' + } + ) do |transport| + apis = API.defaults + + transport.api API::V01, apis[API::V01], default: true + + # Call block to apply any customization, if provided + yield(transport) if block_given? + end.to_transport(Transport::Stats::Transport) + end + end + end + end +end diff --git a/lib/datadog/data_streams/transport/http/api.rb b/lib/datadog/data_streams/transport/http/api.rb new file mode 100644 index 00000000000..25f6212c133 --- /dev/null +++ b/lib/datadog/data_streams/transport/http/api.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +require_relative '../../../core/transport/http/api/map' +require_relative '../../../core/transport/http/api/instance' +require_relative '../../../core/transport/http/api/spec' +require_relative '../../../core/transport/http/api/endpoint' +require_relative 'stats' + +module Datadog + module DataStreams + module Transport + module HTTP + # Namespace for API components + module API + # API version + V01 = 'v0.1' + + module_function + + def defaults + Core::Transport::HTTP::API::Map[ + V01 => Stats::API::Spec.new do |s| + s.stats = Stats::API::Endpoint.new( + '/v0.1/pipeline_stats' + ) + end + ] + end + end + end + end + end +end diff --git a/lib/datadog/data_streams/transport/http/client.rb b/lib/datadog/data_streams/transport/http/client.rb new file mode 100644 index 00000000000..e300abf901e --- /dev/null +++ b/lib/datadog/data_streams/transport/http/client.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +require_relative '../../../core/transport/http/response' + +module Datadog + module DataStreams + module Transport + module HTTP + # HTTP client for Data Streams Monitoring + class Client + attr_reader :api, :logger + + def initialize(api, logger:) + @api = api + @logger = logger + end + + def send_stats_payload(request) + send_request(request) do |api, env| + api.send_stats(env) + end + end + + private + + def send_request(request, &block) + # Build request into env + env = build_env(request) + + # Get response from API + yield(api, env) + rescue => e + message = + "Internal error during #{self.class.name} request. Cause: #{e.class}: #{e} " \ + "Location: #{Array(e.backtrace).first}" + + logger.debug(message) + + Datadog::Core::Transport::InternalErrorResponse.new(e) + end + + def build_env(request) + Datadog::Core::Transport::HTTP::Env.new(request) + end + end + end + end + end +end diff --git a/lib/datadog/data_streams/transport/http/stats.rb b/lib/datadog/data_streams/transport/http/stats.rb new file mode 100644 index 00000000000..151f7acb8f8 --- /dev/null +++ b/lib/datadog/data_streams/transport/http/stats.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +require_relative '../stats' +require_relative 'client' +require_relative '../../../core/transport/http/response' +require_relative '../../../core/transport/http/api/endpoint' +require_relative '../../../core/transport/http/api/spec' +require_relative '../../../core/transport/http/api/instance' + +module Datadog + module DataStreams + module Transport + module HTTP + # HTTP transport behavior for Data Streams stats + module Stats + # Response from HTTP transport for DSM stats + class Response + include Datadog::Core::Transport::HTTP::Response + + def initialize(http_response) + super + end + end + + module API + # HTTP API Spec for DSM + class Spec < Core::Transport::HTTP::API::Spec + attr_accessor :stats + + def send_stats(env, &block) + raise Core::Transport::HTTP::API::Spec::EndpointNotDefinedError.new('stats', self) if stats.nil? + + stats.call(env, &block) + end + + def encoder + # DSM handles encoding in the transport layer (MessagePack + gzip) + # so we don't need an encoder at the API level + nil + end + end + + # HTTP API Instance for DSM + class Instance < Core::Transport::HTTP::API::Instance + def send_stats(env) + unless spec.is_a?(Stats::API::Spec) + raise Core::Transport::HTTP::API::Instance::EndpointNotSupportedError.new( + 'stats', self + ) + end + + spec.send_stats(env) do |request_env| + call(request_env) + end + end + end + + # Endpoint for submitting DSM stats data + class Endpoint < Core::Transport::HTTP::API::Endpoint + def initialize(path) + super(:post, path) + end + + def call(env, &block) + # Build request + env.verb = verb + env.path = path + env.body = env.request.parcel.data + + # Send request + http_response = yield(env) + + # Build response + Response.new(http_response) + end + + def encoder + # DSM handles encoding in the transport layer + nil + end + end + end + end + end + end + end +end diff --git a/lib/datadog/data_streams/transport/stats.rb b/lib/datadog/data_streams/transport/stats.rb new file mode 100644 index 00000000000..9cad48d2090 --- /dev/null +++ b/lib/datadog/data_streams/transport/stats.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +require 'msgpack' +require 'zlib' +require_relative '../../core/transport/parcel' +require_relative '../../core/transport/request' + +module Datadog + module DataStreams + module Transport + module Stats + # Parcel for encoded DSM stats payload + class EncodedParcel + include Datadog::Core::Transport::Parcel + + def initialize(data) + @data = data + end + + attr_reader :data + end + + # Request for DSM stats + class Request < Datadog::Core::Transport::Request + end + + # Transport for Data Streams Monitoring stats + class Transport + attr_reader :client, :apis, :current_api_id, :logger + + def initialize(apis, default_api, logger:) + @apis = apis + @logger = logger + @default_api = default_api + @current_api_id = default_api + + @client = HTTP::Client.new(current_api, logger: @logger) + end + + def send_stats(payload) + # MessagePack encode and gzip compress the payload + msgpack_data = MessagePack.pack(payload) + compressed_data = Zlib.gzip(msgpack_data) + + # Create parcel and request + parcel = EncodedParcel.new(compressed_data) + request = Request.new(parcel) + + # Send to agent + client.send_stats_payload(request) + end + + def current_api + apis[@current_api_id] + end + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb b/lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb new file mode 100644 index 00000000000..e64be4e0d77 --- /dev/null +++ b/lib/datadog/tracing/contrib/kafka/instrumentation/consumer.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +module Datadog + module Tracing + module Contrib + module Kafka + module Instrumentation + # Instrumentation for Kafka::Consumer + module Consumer + def self.prepended(base) + base.prepend(InstanceMethods) + end + + # Instance methods for consumer instrumentation + module InstanceMethods + def each_message(**kwargs, &block) + return super unless Datadog::DataStreams.enabled? + + wrapped_block = proc do |message| + Datadog.logger.debug { "Kafka each_message: DSM enabled for topic #{message.topic}" } + + begin + headers = message.headers || {} + Datadog::DataStreams.set_consume_checkpoint( + type: 'kafka', + source: message.topic, + auto_instrumentation: true + ) { |key| headers[key] } + rescue => e + Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") + end + + yield(message) if block + end + + super(**kwargs, &wrapped_block) + end + + def each_batch(**kwargs, &block) + return super unless Datadog::DataStreams.enabled? + + wrapped_block = proc do |batch| + Datadog.logger.debug { "Kafka each_batch: DSM enabled for topic #{batch.topic}" } + + begin + Datadog::DataStreams.set_consume_checkpoint( + type: 'kafka', + source: batch.topic, + auto_instrumentation: true + ) + rescue => e + Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") + end + + yield(batch) if block + end + + super(**kwargs, &wrapped_block) + end + end + end + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb b/lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb new file mode 100644 index 00000000000..d450920e3cc --- /dev/null +++ b/lib/datadog/tracing/contrib/kafka/instrumentation/producer.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +module Datadog + module Tracing + module Contrib + module Kafka + module Instrumentation + # Instrumentation for Kafka::Producer + module Producer + def self.prepended(base) + base.prepend(InstanceMethods) + end + + module InstanceMethods + def deliver_messages(**kwargs) + if Datadog::DataStreams.enabled? + begin + pending_messages = instance_variable_get(:@pending_message_queue) + + if pending_messages && !pending_messages.empty? + pending_messages.each do |message| + message.headers ||= {} + Datadog::DataStreams.set_produce_checkpoint( + type: 'kafka', + destination: message.topic, + auto_instrumentation: true + ) do |key, value| + message.headers[key] = value + end + end + end + rescue => e + Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") + end + end + + super + end + + def send_messages(messages, **kwargs) + if Datadog::DataStreams.enabled? + begin + messages.each do |message| + message[:headers] ||= {} + Datadog::DataStreams.set_produce_checkpoint( + type: 'kafka', + destination: message[:topic], + auto_instrumentation: true + ) do |key, value| + message[:headers][key] = value + end + end + rescue => e + Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") + end + end + + super + end + end + end + end + end + end + end +end diff --git a/lib/datadog/tracing/contrib/kafka/patcher.rb b/lib/datadog/tracing/contrib/kafka/patcher.rb index 8e3acf1e112..a8d20be739d 100644 --- a/lib/datadog/tracing/contrib/kafka/patcher.rb +++ b/lib/datadog/tracing/contrib/kafka/patcher.rb @@ -21,6 +21,20 @@ def target_version def patch # Subscribe to Kafka events Events.subscribe! + + # Apply monkey patches for additional instrumentation (e.g., DSM) + patch_producer if defined?(::Kafka::Producer) + patch_consumer if defined?(::Kafka::Consumer) + end + + def patch_producer + require_relative 'instrumentation/producer' + ::Kafka::Producer.prepend(Instrumentation::Producer) + end + + def patch_consumer + require_relative 'instrumentation/consumer' + ::Kafka::Consumer.prepend(Instrumentation::Consumer) end end end diff --git a/lib/datadog/tracing/contrib/karafka/monitor.rb b/lib/datadog/tracing/contrib/karafka/monitor.rb index 847cec75092..5befa6d7d4e 100644 --- a/lib/datadog/tracing/contrib/karafka/monitor.rb +++ b/lib/datadog/tracing/contrib/karafka/monitor.rb @@ -47,6 +47,17 @@ def instrument(event_id, payload = {}, &block) span.set_tag(Ext::TAG_CONSUMER, consumer) span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, job.executor.topic.name) span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM) + + # DSM: Track consumer offset stats for batch processing + if Datadog.configuration.data_streams.enabled + job.messages.each do |message| + Datadog::DataStreams.track_kafka_consume( + job.executor.topic.name, + job.executor.partition, + message.metadata.offset + ) + end + end end super diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb index 43b9e428557..a8898bcc9af 100644 --- a/lib/datadog/tracing/contrib/karafka/patcher.rb +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -35,6 +35,24 @@ def each(&block) Datadog::Tracing.continue_trace!(trace_digest) if trace_digest end + if Datadog::DataStreams.enabled? + begin + headers = if message.metadata.respond_to?(:raw_headers) + message.metadata.raw_headers + else + message.metadata.headers + end + + Datadog::DataStreams.set_consume_checkpoint( + type: 'kafka', + source: message.topic, + auto_instrumentation: true + ) { |key| headers[key] } + rescue => e + Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") + end + end + Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span| span.set_tag(Ext::TAG_OFFSET, message.metadata.offset) span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic) diff --git a/sig/datadog/core/configuration/components.rbs b/sig/datadog/core/configuration/components.rbs index 4b299672115..09fac6ab469 100644 --- a/sig/datadog/core/configuration/components.rbs +++ b/sig/datadog/core/configuration/components.rbs @@ -38,6 +38,8 @@ module Datadog attr_reader agent_info: Datadog::Core::Environment::AgentInfo + attr_reader data_streams: Datadog::DataStreams::Processor? + def initialize: (untyped settings) -> untyped def startup!: (untyped settings) -> untyped diff --git a/sig/datadog/data_streams.rbs b/sig/datadog/data_streams.rbs new file mode 100644 index 00000000000..9d254a9db5e --- /dev/null +++ b/sig/datadog/data_streams.rbs @@ -0,0 +1,22 @@ +module Datadog + module DataStreams + def self.set_produce_checkpoint: (type: ::String, destination: ::String, ?auto_instrumentation: bool, ?tags: ::Hash[::String, ::String]) ?{ (::String, ::String) -> void } -> ::String? + + def self.set_consume_checkpoint: (type: ::String, source: ::String, ?auto_instrumentation: bool, ?tags: ::Hash[::String, ::String]) ?{ (::String) -> ::String? } -> ::String? + + def self.track_kafka_produce: (::String topic, ::Integer partition, ::Integer offset) -> (nil | true) + + def self.track_kafka_commit: (::String group, ::String topic, ::Integer partition, ::Integer offset) -> (nil | true) + + def self.track_kafka_consume: (::String topic, ::Integer partition, ::Integer offset) -> (nil | true) + + def self.enabled?: () -> bool + + private + + def self.processor: () -> Processor? + + def self.components: () -> untyped + end +end + diff --git a/sig/datadog/data_streams/configuration.rbs b/sig/datadog/data_streams/configuration.rbs new file mode 100644 index 00000000000..69fc5962257 --- /dev/null +++ b/sig/datadog/data_streams/configuration.rbs @@ -0,0 +1,6 @@ +module Datadog + module DataStreams + module Configuration + end + end +end diff --git a/sig/datadog/data_streams/configuration/settings.rbs b/sig/datadog/data_streams/configuration/settings.rbs new file mode 100644 index 00000000000..a32363ccd64 --- /dev/null +++ b/sig/datadog/data_streams/configuration/settings.rbs @@ -0,0 +1,10 @@ +module Datadog + module DataStreams + module Configuration + module Settings + def self.extended: (Module base) -> void + end + end + end +end + diff --git a/sig/datadog/data_streams/ext.rbs b/sig/datadog/data_streams/ext.rbs new file mode 100644 index 00000000000..4946f53f809 --- /dev/null +++ b/sig/datadog/data_streams/ext.rbs @@ -0,0 +1,8 @@ +module Datadog + module DataStreams + module Ext + ENV_ENABLED: ::String + end + end +end + diff --git a/sig/datadog/data_streams/extensions.rbs b/sig/datadog/data_streams/extensions.rbs new file mode 100644 index 00000000000..427a2c4e9e2 --- /dev/null +++ b/sig/datadog/data_streams/extensions.rbs @@ -0,0 +1,8 @@ +module Datadog + module DataStreams + module Extensions + def self.activate!: () -> void + end + end +end + diff --git a/sig/datadog/data_streams/pathway_context.rbs b/sig/datadog/data_streams/pathway_context.rbs new file mode 100644 index 00000000000..ff624e543ae --- /dev/null +++ b/sig/datadog/data_streams/pathway_context.rbs @@ -0,0 +1,47 @@ +module Datadog + module DataStreams + class PathwayContext + @hash: ::Integer + + @pathway_start: ::Time + + @current_edge_start: ::Time + + @parent_hash: ::Integer? + @previous_direction: ::String? + + @closest_opposite_direction_hash: ::Integer + + @closest_opposite_direction_edge_start: ::Time + + attr_accessor hash: ::Integer + + attr_accessor pathway_start: ::Time + + attr_accessor current_edge_start: ::Time + + attr_accessor parent_hash: ::Integer? + + attr_accessor previous_direction: ::String? + + attr_accessor closest_opposite_direction_hash: ::Integer + + attr_accessor closest_opposite_direction_edge_start: ::Time + + def initialize: (hash_value: ::Integer, pathway_start: ::Time, current_edge_start: ::Time) -> void + + def encode: () -> ::String + + def encode_b64: () -> ::String + def self.decode_b64: (::String encoded_ctx) -> PathwayContext? + def self.decode: (::String binary_data) -> PathwayContext? + + private + + def encode_var_int_64: (::Integer value) -> ::String + def time_to_ms: (::Time time) -> ::Integer + def self.decode_varint: (::StringIO io) -> ::Integer? + def self.ms_to_time: (::Integer milliseconds) -> ::Time + end + end + end diff --git a/sig/datadog/data_streams/processor.rbs b/sig/datadog/data_streams/processor.rbs new file mode 100644 index 00000000000..6ccef167864 --- /dev/null +++ b/sig/datadog/data_streams/processor.rbs @@ -0,0 +1,86 @@ +module Datadog + module DataStreams + class UnsupportedError < StandardError + end + + class Processor < Core::Worker + PROPAGATION_KEY: ::String + DEFAULT_BUFFER_SIZE: ::Integer + + @pathway_context: PathwayContext + + @bucket_size_ns: ::Integer + + @buckets: ::Hash[::Integer, bucket_type] + + @consumer_stats: ::Array[consumer_stat_type] + + @stats_mutex: ::Thread::Mutex + + @event_buffer: Core::Buffer::CRuby + + @transport: Transport::Stats::Transport? + + @logger: Core::Logger + + @settings: Core::Configuration::Settings + + @agent_settings: Core::Configuration::AgentSettings + + include Core::Workers::Polling + include Core::Workers::IntervalLoop + + def initialize: (interval: ::Float, logger: Core::Logger, settings: Core::Configuration::Settings, agent_settings: Core::Configuration::AgentSettings, ?buffer_size: ::Integer) -> void + def track_kafka_produce: (::String topic, ::Integer partition, ::Integer offset, ::Time now) -> true + def track_kafka_consume: (::String topic, ::Integer partition, ::Integer offset, ::Time now) -> true + def set_produce_checkpoint: (type: ::String, destination: ::String, ?manual_checkpoint: bool, ?tags: ::Hash[::String, ::String]) ?{ (::String, ::String) -> void } -> ::String? + def set_consume_checkpoint: (type: ::String, source: ::String, ?manual_checkpoint: bool, ?tags: ::Hash[::String, ::String]) ?{ (::String) -> ::String? } -> ::String? + + private + + def encode_pathway_context: () -> ::String? + + def set_checkpoint: (tags: ::Array[::String], ?now: ::Time?, ?payload_size: ::Integer, ?span: Tracing::SpanOperation?) -> ::String? + + def decode_pathway_context: (::String encoded_ctx) -> PathwayContext? + + def decode_pathway_b64: (::String encoded_ctx) -> PathwayContext? + def perform: () -> (bool | nil) + def flush_stats: () -> void + + def process_events: () -> void + def process_kafka_produce_event: (::Hash[::Symbol, untyped] event) -> void + def process_kafka_consume_event: (::Hash[::Symbol, untyped] event) -> void + def process_checkpoint_event: (::Hash[::Symbol, untyped] event) -> void + + def get_current_pathway: () -> PathwayContext? + def get_current_context: () -> PathwayContext + + def set_pathway_context: (PathwayContext? ctx) -> void + + def decode_and_set_pathway_context: (::Hash[::String, ::String] headers) -> void + def compute_pathway_hash: (::Integer current_hash, ::Array[::String] tags) -> ::Integer + def fnv1_64: (::String data) -> ::Integer + def record_checkpoint_stats: (hash: ::Integer, parent_hash: ::Integer, edge_latency_sec: ::Float, full_pathway_latency_sec: ::Float, payload_size: ::Integer, tags: ::Array[::String], timestamp_sec: ::Float) -> void + def record_consumer_stats: (topic: ::String, partition: ::Integer, offset: ::Integer, timestamp: ::Time) -> void + def aggregate_consumer_stats_by_partition: (::String topic, ::Integer partition, ::Integer offset, ::Time timestamp) -> void + def send_stats_to_agent: (::Hash[::String, untyped] payload) -> void + def send_dsm_payload: (::String data, ::Hash[::String, ::String] headers) -> untyped + def compress_payload?: (::String payload) -> bool + def gzip_compress: (::String data) -> ::String + def serialize_buckets: () -> ::Array[::Hash[::String, untyped]] + def serialize_consumer_backlogs: () -> ::Array[::Hash[::String, untyped]] + def hostname: () -> ::String + def create_bucket: () -> bucket_type + def create_pathway_stats: () -> pathway_stats_type + def transport: () -> Transport::Stats::Transport + def service_name: () -> (::String | nil) + def env_name: () -> (::String | nil) + def resolved_agent_settings: () -> Core::Configuration::AgentSettings + + type bucket_type = { pathway_stats: ::Hash[::Array[untyped], pathway_stats_type], latest_produce_offsets: ::Hash[::String, ::Integer], latest_commit_offsets: ::Hash[::String, ::Integer] } + type pathway_stats_type = { edge_latency: Core::DDSketch, full_pathway_latency: Core::DDSketch, payload_size_sum: ::Integer, payload_size_count: ::Integer } + type consumer_stat_type = { topic: ::String, partition: ::Integer, offset: ::Integer, timestamp: ::Time, timestamp_sec: ::Float } + end + end + end diff --git a/sig/datadog/data_streams/transport/http.rbs b/sig/datadog/data_streams/transport/http.rbs new file mode 100644 index 00000000000..77c1b862815 --- /dev/null +++ b/sig/datadog/data_streams/transport/http.rbs @@ -0,0 +1,12 @@ +module Datadog + module DataStreams + module Transport + module HTTP + def self?.default: ( + agent_settings: Core::Configuration::AgentSettings, + logger: Core::Logger + ) ?{ (Core::Transport::HTTP::Builder) -> void } -> Transport::Stats::Transport + end + end + end +end diff --git a/sig/datadog/data_streams/transport/http/api.rbs b/sig/datadog/data_streams/transport/http/api.rbs new file mode 100644 index 00000000000..fd8c7375f74 --- /dev/null +++ b/sig/datadog/data_streams/transport/http/api.rbs @@ -0,0 +1,13 @@ +module Datadog + module DataStreams + module Transport + module HTTP + module API + V01: ::String + + def self?.defaults: () -> ::Hash[::String, Stats::API::Spec] + end + end + end + end +end diff --git a/sig/datadog/data_streams/transport/http/client.rbs b/sig/datadog/data_streams/transport/http/client.rbs new file mode 100644 index 00000000000..dcc3551cab7 --- /dev/null +++ b/sig/datadog/data_streams/transport/http/client.rbs @@ -0,0 +1,25 @@ +module Datadog + module DataStreams + module Transport + module HTTP + class Client + @api: Stats::API::Instance + @logger: Core::Logger + + attr_reader api: Stats::API::Instance + attr_reader logger: Core::Logger + + def initialize: (Stats::API::Instance api, logger: Core::Logger) -> void + + def send_stats_payload: (Transport::Stats::Request request) -> (Core::Transport::HTTP::Response | Core::Transport::InternalErrorResponse) + + private + + def send_request: (Transport::Stats::Request request) { (Stats::API::Instance, Core::Transport::HTTP::Env) -> Core::Transport::HTTP::Response } -> (Core::Transport::HTTP::Response | Core::Transport::InternalErrorResponse) + + def build_env: (Transport::Stats::Request request) -> Core::Transport::HTTP::Env + end + end + end + end +end diff --git a/sig/datadog/data_streams/transport/http/stats.rbs b/sig/datadog/data_streams/transport/http/stats.rbs new file mode 100644 index 00000000000..31d0d846713 --- /dev/null +++ b/sig/datadog/data_streams/transport/http/stats.rbs @@ -0,0 +1,37 @@ +module Datadog + module DataStreams + module Transport + module HTTP + module Stats + class Response + include Core::Transport::HTTP::Response + + def initialize: (Core::Transport::HTTP::Response http_response) -> void + end + + module API + class Spec < Core::Transport::HTTP::API::Spec + attr_accessor stats: Endpoint? + + def send_stats: (Core::Transport::HTTP::Env env) { (Core::Transport::HTTP::Env) -> Core::Transport::HTTP::Response } -> Core::Transport::HTTP::Response + + def encoder: () -> nil + end + + class Instance < Core::Transport::HTTP::API::Instance + def send_stats: (Core::Transport::HTTP::Env env) -> Core::Transport::HTTP::Response + end + + class Endpoint < Core::Transport::HTTP::API::Endpoint + def initialize: (::String path) -> void + + def call: (Core::Transport::HTTP::Env env) { (Core::Transport::HTTP::Env) -> Core::Transport::HTTP::Response } -> Response + + def encoder: () -> nil + end + end + end + end + end + end +end diff --git a/sig/datadog/data_streams/transport/stats.rbs b/sig/datadog/data_streams/transport/stats.rbs new file mode 100644 index 00000000000..2d7a94c4c8e --- /dev/null +++ b/sig/datadog/data_streams/transport/stats.rbs @@ -0,0 +1,43 @@ +module Datadog + module DataStreams + module Transport + module Stats + class EncodedParcel + include Core::Transport::Parcel + + @data: ::String + + attr_reader data: ::String + + def initialize: (::String data) -> void + end + + class Request < Core::Transport::Request + end + + class Transport + @apis: ::Hash[::String, HTTP::Stats::API::Instance] + @logger: Core::Logger + @default_api: ::String + @current_api_id: ::String + @client: HTTP::Client + + attr_reader client: HTTP::Client + attr_reader apis: ::Hash[::String, HTTP::Stats::API::Instance] + attr_reader current_api_id: ::String + attr_reader logger: Core::Logger + + def initialize: ( + ::Hash[::String, HTTP::Stats::API::Instance] apis, + ::String default_api, + logger: Core::Logger + ) -> void + + def send_stats: (::Hash[::String, untyped] payload) -> (Core::Transport::HTTP::Response | Core::Transport::InternalErrorResponse) + + def current_api: () -> HTTP::Stats::API::Instance + end + end + end + end +end diff --git a/sig/datadog/tracing/contrib/kafka/instrumentation/consumer.rbs b/sig/datadog/tracing/contrib/kafka/instrumentation/consumer.rbs new file mode 100644 index 00000000000..d69bf64d684 --- /dev/null +++ b/sig/datadog/tracing/contrib/kafka/instrumentation/consumer.rbs @@ -0,0 +1,17 @@ +module Datadog + module Tracing + module Contrib + module Kafka + module Instrumentation + module Consumer + def self.included: (untyped base) -> untyped + module InstanceMethods + def each_message: (**untyped kwargs) { (untyped) -> untyped } -> untyped + def each_batch: (**untyped kwargs) { (untyped) -> untyped } -> untyped + end + end + end + end + end + end +end diff --git a/sig/datadog/tracing/contrib/kafka/instrumentation/producer.rbs b/sig/datadog/tracing/contrib/kafka/instrumentation/producer.rbs new file mode 100644 index 00000000000..e002ae829fa --- /dev/null +++ b/sig/datadog/tracing/contrib/kafka/instrumentation/producer.rbs @@ -0,0 +1,17 @@ +module Datadog + module Tracing + module Contrib + module Kafka + module Instrumentation + module Producer + def self.included: (untyped base) -> untyped + module InstanceMethods + def deliver_messages: (**untyped kwargs) -> untyped + def send_messages: (untyped messages, **untyped kwargs) -> untyped + end + end + end + end + end + end +end diff --git a/spec/datadog/core/ddsketch_spec.rb b/spec/datadog/core/ddsketch_spec.rb index ae024632a59..edb928cf7b5 100644 --- a/spec/datadog/core/ddsketch_spec.rb +++ b/spec/datadog/core/ddsketch_spec.rb @@ -1,3 +1,4 @@ +require 'datadog/core' require 'datadog/core/ddsketch' require 'datadog/core/ddsketch_pprof/ddsketch_pb' diff --git a/spec/datadog/data_streams/configuration/settings_spec.rb b/spec/datadog/data_streams/configuration/settings_spec.rb new file mode 100644 index 00000000000..05b3cb3151a --- /dev/null +++ b/spec/datadog/data_streams/configuration/settings_spec.rb @@ -0,0 +1,101 @@ +require 'spec_helper' + +RSpec.describe Datadog::DataStreams::Configuration::Settings do + subject(:settings) { Datadog::Core::Configuration::Settings.new } + + describe 'data_streams' do + describe '#enabled' do + subject(:enabled) { settings.data_streams.enabled } + + context 'when DD_DATA_STREAMS_ENABLED' do + around do |example| + ClimateControl.modify('DD_DATA_STREAMS_ENABLED' => data_streams_enabled) do + example.run + end + end + + context 'is not defined' do + let(:data_streams_enabled) { nil } + + it { is_expected.to eq false } + end + + context 'is defined as true' do + let(:data_streams_enabled) { 'true' } + + it { is_expected.to eq true } + end + + context 'is defined as false' do + let(:data_streams_enabled) { 'false' } + + it { is_expected.to eq false } + end + end + end + + describe '#enabled=' do + subject(:set_data_streams_enabled) { settings.data_streams.enabled = data_streams_enabled } + + [true, false].each do |value| + context "when given #{value}" do + let(:data_streams_enabled) { value } + + before { set_data_streams_enabled } + + it { expect(settings.data_streams.enabled).to eq(value) } + end + end + end + + describe '#interval' do + subject(:interval) { settings.data_streams.interval } + + context 'when _DD_TRACE_STATS_WRITER_INTERVAL' do + around do |example| + ClimateControl.modify('_DD_TRACE_STATS_WRITER_INTERVAL' => data_streams_interval) do + example.run + end + end + + context 'is not defined' do + let(:data_streams_interval) { nil } + + it { is_expected.to eq 10.0 } + end + + context 'is defined' do + let(:data_streams_interval) { '5.0' } + + it { is_expected.to eq 5.0 } + end + + context 'is defined as an integer' do + let(:data_streams_interval) { '20' } + + it { is_expected.to eq 20.0 } + end + end + end + + describe '#interval=' do + subject(:set_data_streams_interval) { settings.data_streams.interval = data_streams_interval } + + context 'when given a float value' do + let(:data_streams_interval) { 15.5 } + + before { set_data_streams_interval } + + it { expect(settings.data_streams.interval).to eq(15.5) } + end + + context 'when given an integer value' do + let(:data_streams_interval) { 30 } + + before { set_data_streams_interval } + + it { expect(settings.data_streams.interval).to eq(30.0) } + end + end + end +end diff --git a/spec/datadog/data_streams/pathway_context_spec.rb b/spec/datadog/data_streams/pathway_context_spec.rb new file mode 100644 index 00000000000..5ef7bc37c0c --- /dev/null +++ b/spec/datadog/data_streams/pathway_context_spec.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'datadog/data_streams/pathway_context' + +RSpec.describe Datadog::DataStreams::PathwayContext do + describe 'encode/decode round-trip' do + let(:hash_value) { 12345678901234567890 } + let(:pathway_start) { Time.at(1609459200.123) } # 2021-01-01 00:00:00.123 + let(:current_edge_start) { Time.at(1609459260.456) } # 2021-01-01 00:01:00.456 + + let(:context) do + described_class.new( + hash_value: hash_value, + pathway_start: pathway_start, + current_edge_start: current_edge_start + ) + end + + it 'successfully encodes and decodes pathway context' do + # Arrange & Act: Encode to base64 + encoded = context.encode_b64 + + # Act: Decode back to object + decoded_context = described_class.decode_b64(encoded) + + # Assert: Values should match original + expect(decoded_context).not_to be_nil + expect(decoded_context.hash).to eq(hash_value) + expect(decoded_context.pathway_start.to_f).to be_within(0.001).of(pathway_start.to_f) + expect(decoded_context.current_edge_start.to_f).to be_within(0.001).of(current_edge_start.to_f) + end + + it 'handles edge cases in encoding/decoding' do + # Test with zero values + zero_context = described_class.new(hash_value: 0, pathway_start: Time.at(0), current_edge_start: Time.at(0)) + encoded = zero_context.encode_b64 + decoded = described_class.decode_b64(encoded) + + expect(decoded).not_to be_nil + expect(decoded.hash).to eq(0) + expect(decoded.pathway_start.to_f).to eq(0.0) + expect(decoded.current_edge_start.to_f).to eq(0.0) + end + + it 'handles large values in encoding/decoding' do + # Test with large values + large_hash = 18446744073709551615 # Max uint64 + large_time = Time.now + 1000000 # Far future + + large_context = described_class.new( + hash_value: large_hash, + pathway_start: large_time, + current_edge_start: large_time + 100 + ) + encoded = large_context.encode_b64 + decoded = described_class.decode_b64(encoded) + + expect(decoded).not_to be_nil + expect(decoded.hash).to eq(large_hash) + expect(decoded.pathway_start.to_f).to be_within(0.001).of(large_time.to_f) + expect(decoded.current_edge_start.to_f).to be_within(0.001).of((large_time + 100).to_f) + end + end + + describe 'decode error handling' do + it 'returns nil for invalid base64' do + result = described_class.decode_b64('invalid-base64!') + expect(result).to be_nil + end + + it 'returns nil for empty string' do + result = described_class.decode_b64('') + expect(result).to be_nil + end + + it 'returns nil for nil input' do + result = described_class.decode_b64(nil) + expect(result).to be_nil + end + + it 'returns nil for truncated data' do + # Base64 encode only 4 bytes (need at least 8 for hash) + truncated = Datadog::Core::Utils::Base64.strict_encode64("\x01\x02\x03\x04") + result = described_class.decode_b64(truncated) + expect(result).to be_nil + end + end + + describe 'VarInt encoding/decoding' do + it 'correctly encodes and decodes VarInt values' do + test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 2097151, 2097152] + + test_values.each do |value| + # Create context with test value as timestamp + context = described_class.new(hash_value: 12345, pathway_start: Time.at(value / 1000.0), current_edge_start: Time.at(value / 1000.0)) + + encoded = context.encode_b64 + decoded = described_class.decode_b64(encoded) + + expect(decoded).not_to be_nil, "Failed to decode VarInt value: #{value}" + expect(decoded.pathway_start.to_f).to be_within(0.001).of(value / 1000.0) + expect(decoded.current_edge_start.to_f).to be_within(0.001).of(value / 1000.0) + end + end + end +end diff --git a/spec/datadog/data_streams/processor_spec.rb b/spec/datadog/data_streams/processor_spec.rb new file mode 100644 index 00000000000..99d5e08e570 --- /dev/null +++ b/spec/datadog/data_streams/processor_spec.rb @@ -0,0 +1,296 @@ +# frozen_string_literal: true + +require 'datadog/core' +require 'datadog/data_streams/processor' +require 'datadog/core/ddsketch' +require_relative 'spec_helper' + +# Expected deterministic hash values for specific pathways (with manual_checkpoint: false) +KAFKA_ORDERS_PRODUCE_HASH = 17981503584283442515 +KAFKA_ORDERS_CONSUME_HASH = 2205397010147396424 # with carrier from produce +KAFKA_ORDERS_CONSUME_HASH_WITHOUT_CARRIER = 9826962151962828715 # without carrier +KINESIS_ORDERS_PRODUCE_HASH = 14687993552271180499 +KAFKA_PAYMENTS_PRODUCE_HASH = 10550901661805295262 + +RSpec.describe Datadog::DataStreams::Processor do + before do + skip_if_data_streams_not_supported(self) + end + + let(:logger) { instance_double(Datadog::Core::Logger, debug: nil) } + let(:settings) { double('Settings', service: Datadog.configuration.service, env: Datadog.configuration.env) } + let(:agent_settings) { Datadog::Core::Configuration::AgentSettings.new(adapter: :test, hostname: 'localhost', port: 9999) } + let(:processor) { described_class.new(interval: 10.0, logger: logger, settings: settings, agent_settings: agent_settings) } + + before do + # Stub HTTP requests to the agent + stub_request(:post, %r{http://localhost:9999/v0.1/pipeline_stats}) + .to_return(status: 200, body: '', headers: {}) + end + + describe '#initialize' do + it 'sets up periodic worker with custom interval' do + processor = described_class.new(interval: 5.0, logger: logger, settings: settings, agent_settings: agent_settings) + expect(processor.loop_base_interval).to eq(5.0) + end + end + + describe 'public checkpoint API' do + after { processor.stop(true) } + + describe '#set_produce_checkpoint' do + it 'returns a hash' do + result = processor.set_produce_checkpoint(type: 'kafka', destination: 'orders') + expect(result).to be_a(String) + expect(result).not_to be_empty + end + + it 'computes deterministic hash' do + processor.set_produce_checkpoint(type: 'kafka', destination: 'orders', manual_checkpoint: false) + expect(processor.pathway_context.hash).to eq(KAFKA_ORDERS_PRODUCE_HASH) + end + + it 'adds the hash to the carrier' do + carrier = {} + returned_value = processor.set_produce_checkpoint(type: 'kafka', destination: 'orders', manual_checkpoint: false) do |key, value| + carrier[key] = value + end + + expect(carrier[Datadog::DataStreams::Processor::PROPAGATION_KEY]).to eq(returned_value) + + # Decode and verify the pathway context contains the expected hash + decoded = Datadog::DataStreams::PathwayContext.decode_b64(returned_value) + expect(decoded).to have_attributes(hash: KAFKA_ORDERS_PRODUCE_HASH) + end + + it 'sets tags on the active_span for that hash' do + span = instance_double(Datadog::Tracing::SpanOperation) + allow(Datadog::Tracing).to receive(:active_span).and_return(span) + expect(span).to receive(:set_tag).with('pathway.hash', KAFKA_ORDERS_PRODUCE_HASH.to_s) + + processor.set_produce_checkpoint(type: 'kafka', destination: 'orders', manual_checkpoint: false) + end + + it 'advances the pathway context with new hash' do + initial_hash = processor.pathway_context.hash + + processor.set_produce_checkpoint(type: 'kafka', destination: 'orders') + + expect(processor.pathway_context.hash).not_to eq(initial_hash) + end + + it 'restarts pathway on consecutive same-direction checkpoints (loop detection)' do + processor.set_produce_checkpoint(type: 'kafka', destination: 'step1') + first_pathway_start = processor.pathway_context.pathway_start + + processor.set_produce_checkpoint(type: 'kafka', destination: 'step2') + + expect(processor.pathway_context.pathway_start).not_to eq(first_pathway_start) + expect(processor.pathway_context.pathway_start).to be >= first_pathway_start + end + end + + describe '#set_consume_checkpoint' do + it 'returns a hash' do + result = processor.set_consume_checkpoint(type: 'kafka', source: 'orders') + expect(result).to be_a(String) + expect(result).not_to be_empty + end + + it 'computes deterministic hash' do + processor.set_consume_checkpoint(type: 'kafka', source: 'orders', manual_checkpoint: false) + expect(processor.pathway_context.hash).to eq(KAFKA_ORDERS_CONSUME_HASH_WITHOUT_CARRIER) + end + + it 'can get a previous hash from the carrier' do + # Producer creates context in carrier + producer = described_class.new(interval: 10.0, logger: logger, settings: settings, agent_settings: agent_settings) + carrier = {} + producer.set_produce_checkpoint(type: 'kafka', destination: 'orders', manual_checkpoint: false) do |key, value| + carrier[key] = value + end + produce_hash = producer.pathway_context.hash + + # Consumer reads from carrier + processor.set_consume_checkpoint(type: 'kafka', source: 'orders', manual_checkpoint: false) do |key| + carrier[key] + end + + # Consumer hash is computed from producer hash (parent) + expect(processor.pathway_context.hash).to eq(KAFKA_ORDERS_CONSUME_HASH) + expect(processor.pathway_context.hash).not_to eq(produce_hash) + + producer.stop(true) + end + + it 'sets tags on the active_span for that hash' do + span = instance_double(Datadog::Tracing::SpanOperation) + allow(Datadog::Tracing).to receive(:active_span).and_return(span) + expect(span).to receive(:set_tag).with('pathway.hash', KAFKA_ORDERS_CONSUME_HASH_WITHOUT_CARRIER.to_s) + + processor.set_consume_checkpoint(type: 'kafka', source: 'orders', manual_checkpoint: false) + end + + it 'handles missing pathway context in carrier gracefully' do + carrier = {} + + expect do + processor.set_consume_checkpoint(type: 'kafka', source: 'orders') { |key| carrier[key] } + end.not_to raise_error + end + end + + describe 'pathway context tracking' do + it 'computes different hashes for different edge types' do + processor.set_produce_checkpoint(type: 'kafka', destination: 'orders', manual_checkpoint: false) + expect(processor.pathway_context.hash).to eq(KAFKA_ORDERS_PRODUCE_HASH) + + processor.set_produce_checkpoint(type: 'kinesis', destination: 'orders', manual_checkpoint: false) + expect(processor.pathway_context.hash).to eq(KINESIS_ORDERS_PRODUCE_HASH) + end + + it 'computes different hashes for different topics' do + processor.set_produce_checkpoint(type: 'kafka', destination: 'orders', manual_checkpoint: false) + expect(processor.pathway_context.hash).to eq(KAFKA_ORDERS_PRODUCE_HASH) + + processor.set_produce_checkpoint(type: 'kafka', destination: 'payments', manual_checkpoint: false) + expect(processor.pathway_context.hash).to eq(KAFKA_PAYMENTS_PRODUCE_HASH) + end + end + + describe 'produce-consume pathway flow' do + it 'maintains pathway continuity through produce and consume' do + produce_context = processor.set_produce_checkpoint(type: 'kafka', destination: 'orders') + produce_hash = processor.pathway_context.hash + produce_pathway_start = processor.pathway_context.pathway_start + + carrier = {Datadog::DataStreams::Processor::PROPAGATION_KEY => produce_context} + + processor.set_consume_checkpoint(type: 'kafka', source: 'orders') { |key| carrier[key] } + consume_hash = processor.pathway_context.hash + + expect(consume_hash).not_to eq(produce_hash) + expect(processor.pathway_context.pathway_start.to_f).to be_within(0.001).of(produce_pathway_start.to_f) + end + end + + describe 'internal bucket aggregation' do + it 'aggregates multiple checkpoints into DDSketch histograms' do + now = Time.now.to_f + + # Create multiple checkpoints with the same tags to aggregate + processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false) + processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false) + processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false) + + # Flush the event buffer to process checkpoints + processor.send(:process_events) + + # Access internal buckets to verify aggregation + expect(processor.buckets).not_to be_empty + + # Find the bucket for this time window + now_ns = (now * 1e9).to_i + bucket_time_ns = now_ns - (now_ns % processor.bucket_size_ns) + + bucket = processor.buckets[bucket_time_ns] + expect(bucket).not_to be_nil + + # Verify stats were aggregated for this pathway + pathway_stats = bucket[:pathway_stats] + expect(pathway_stats).not_to be_empty + + # At least one aggregation key should exist + aggr_key = pathway_stats.keys.first + stats = pathway_stats[aggr_key] + + # Verify DDSketch objects were populated + expect(stats[:edge_latency]).to be_a(Datadog::Core::DDSketch) + expect(stats[:full_pathway_latency]).to be_a(Datadog::Core::DDSketch) + + # Verify exactly 3 samples were recorded (matching Python test) + expect(stats[:edge_latency].count).to eq(3) + expect(stats[:full_pathway_latency].count).to eq(3) + + # Verify sketches can be encoded for serialization + expect(stats[:edge_latency].encode).to be_a(String) + expect(stats[:edge_latency].encode).not_to be_empty + expect(stats[:full_pathway_latency].encode).to be_a(String) + expect(stats[:full_pathway_latency].encode).not_to be_empty + end + end + end + + describe 'Kafka tracking methods' do + let(:base_time) { Time.now } + + after { processor.stop(true) } + + describe '#track_kafka_produce' do + it 'tracks produce offset for topic/partition' do + processor.track_kafka_produce('orders', 0, 100, base_time) + processor.track_kafka_produce('orders', 0, 101, base_time + 1) + + # Verify offset tracking works (metadata only, no stats sent) + expect { processor.send(:perform) }.not_to raise_error + end + + it 'tracks multiple produces to same topic/partition' do + processor.track_kafka_produce('orders', 0, 100, base_time) + processor.track_kafka_produce('orders', 0, 101, base_time + 1) + processor.track_kafka_produce('orders', 0, 102, base_time + 2) + + # Should track latest offset (verified in perform/flush) + expect { processor.track_kafka_produce('orders', 0, 103, base_time + 3) }.not_to raise_error + end + + it 'tracks produces to different partitions independently' do + processor.track_kafka_produce('orders', 0, 100, base_time) + processor.track_kafka_produce('orders', 1, 200, base_time) + processor.track_kafka_produce('orders', 2, 300, base_time) + + expect { processor.send(:perform) }.not_to raise_error + end + end + + describe '#track_kafka_consume' do + it 'accepts consume tracking calls without error' do + expect { + processor.track_kafka_consume('orders', 0, 100, base_time) + processor.track_kafka_consume('orders', 0, 101, base_time + 1) + processor.track_kafka_consume('payments', 1, 50, base_time + 2) + }.not_to raise_error + end + + it 'tracks sequential consumption' do + processor.track_kafka_consume('orders', 0, 100, base_time) + processor.track_kafka_consume('orders', 0, 101, base_time + 1) + processor.track_kafka_consume('orders', 0, 102, base_time + 2) + + expect { processor.send(:perform) }.not_to raise_error + end + + it 'detects gaps in consumption (lag)' do + processor.track_kafka_consume('orders', 0, 100, base_time) + # Gap: skipped 101-104 + processor.track_kafka_consume('orders', 0, 105, base_time + 1) + + # Should still track successfully despite gap + expect { processor.send(:perform) }.not_to raise_error + end + end + + describe 'end-to-end Kafka flow' do + it 'tracks complete produce -> consume lifecycle' do + # Producer writes message + processor.track_kafka_produce('orders', 0, 100, base_time) + + # Consumer reads message + processor.track_kafka_consume('orders', 0, 100, base_time + 1) + + # Should flush without errors + expect { processor.send(:perform) }.not_to raise_error + end + end + end +end diff --git a/spec/datadog/data_streams/spec_helper.rb b/spec/datadog/data_streams/spec_helper.rb new file mode 100644 index 00000000000..ee4acb0b184 --- /dev/null +++ b/spec/datadog/data_streams/spec_helper.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module DataStreamsHelpers + def skip_if_data_streams_not_supported(testcase) + testcase.skip("Data Streams Monitoring is not supported on JRuby") if PlatformHelpers.jruby? + testcase.skip("Data Streams Monitoring is not supported on TruffleRuby") if PlatformHelpers.truffleruby? + + # Data Streams Monitoring is not officially supported on macOS due to missing DDSketch binaries, + # but it's still useful to allow it to be enabled for development. + if PlatformHelpers.mac? && ENV["DD_DATA_STREAMS_MACOS_TESTING"] != "true" + testcase.skip( + "Data Streams Monitoring is not supported on macOS. If you still want to run these specs, you can use " \ + "DD_DATA_STREAMS_MACOS_TESTING=true to override this check." + ) + end + + return if Datadog::Core::DDSketch.supported? + + # Ensure DDSketch was loaded correctly + raise "DDSketch does not seem to be available: #{Datadog::Core::LIBDATADOG_API_FAILURE}. " \ + "Try running `bundle exec rake compile` before running this test." + end +end + +RSpec.configure do |config| + config.include DataStreamsHelpers +end diff --git a/spec/datadog/data_streams/transport/http_spec.rb b/spec/datadog/data_streams/transport/http_spec.rb new file mode 100644 index 00000000000..50453b9deaf --- /dev/null +++ b/spec/datadog/data_streams/transport/http_spec.rb @@ -0,0 +1,82 @@ +require 'spec_helper' + +require 'datadog/data_streams/transport/http' + +RSpec.describe Datadog::DataStreams::Transport::HTTP do + let(:logger) { logger_allowing_debug } + + describe '.default' do + subject(:default) { described_class.default(agent_settings: agent_settings, logger: logger) } + + let(:adapter) { :net_http } + let(:ssl) { nil } + let(:hostname) { 'localhost' } + let(:port) { 8126 } + let(:uds_path) { nil } + let(:timeout_seconds) { nil } + + let(:agent_settings) do + Datadog::Core::Configuration::AgentSettings.new( + adapter: adapter, + ssl: ssl, + hostname: hostname, + port: port, + uds_path: uds_path, + timeout_seconds: timeout_seconds + ) + end + + it 'returns a DSM stats transport' do + is_expected.to be_a_kind_of(Datadog::DataStreams::Transport::Stats::Transport) + expect(default.current_api_id).to eq(Datadog::DataStreams::Transport::HTTP::API::V01) + + expect(default.apis.keys).to eq( + [ + Datadog::DataStreams::Transport::HTTP::API::V01, + ] + ) + end + + it 'configures the transport with correct API endpoint' do + api = default.apis[Datadog::DataStreams::Transport::HTTP::API::V01] + expect(api).to be_a_kind_of(Datadog::DataStreams::Transport::HTTP::Stats::API::Instance) + expect(api.spec).to be_a_kind_of(Datadog::DataStreams::Transport::HTTP::Stats::API::Spec) + expect(api.spec.stats.path).to eq('/v0.1/pipeline_stats') + end + + it 'configures the transport with correct headers' do + api = default.apis[Datadog::DataStreams::Transport::HTTP::API::V01] + expect(api.headers).to include( + 'Content-Type' => 'application/msgpack', + 'Content-Encoding' => 'gzip' + ) + expect(api.headers).to include(Datadog::Core::Transport::HTTP.default_headers) + end + + context 'with Net::HTTP adapter' do + let(:adapter) { :net_http } + let(:hostname) { 'custom-host' } + let(:port) { 8888 } + let(:ssl) { true } + + it 'configures the adapter correctly' do + api = default.apis[Datadog::DataStreams::Transport::HTTP::API::V01] + expect(api.adapter).to be_a_kind_of(Datadog::Core::Transport::HTTP::Adapters::Net) + expect(api.adapter.hostname).to eq(hostname) + expect(api.adapter.port).to eq(port) + expect(api.adapter.ssl).to eq(ssl) + end + end + + context 'with Unix socket adapter' do + let(:adapter) { :unix } + let(:uds_path) { '/var/run/datadog/apm.socket' } + + it 'configures the adapter correctly' do + api = default.apis[Datadog::DataStreams::Transport::HTTP::API::V01] + expect(api.adapter).to be_a_kind_of(Datadog::Core::Transport::HTTP::Adapters::UnixSocket) + expect(api.adapter.filepath).to eq(uds_path) + end + end + end +end diff --git a/spec/datadog/data_streams/transport/stats_spec.rb b/spec/datadog/data_streams/transport/stats_spec.rb new file mode 100644 index 00000000000..7969b6d11a8 --- /dev/null +++ b/spec/datadog/data_streams/transport/stats_spec.rb @@ -0,0 +1,80 @@ +require 'spec_helper' + +require 'datadog/data_streams/transport/stats' +require 'datadog/data_streams/transport/http/client' + +RSpec.describe Datadog::DataStreams::Transport::Stats do + let(:logger) { logger_allowing_debug } + + describe '::Transport' do + subject(:transport) { described_class::Transport.new(apis, default_api, logger: logger) } + + let(:default_api) { :v01 } + let(:apis) { {v01: api_instance} } + let(:api_instance) { instance_double(Datadog::Core::Transport::HTTP::API::Instance) } + let(:client) { instance_double(Datadog::DataStreams::Transport::HTTP::Client) } + + before do + allow(Datadog::DataStreams::Transport::HTTP::Client).to receive(:new) + .with(api_instance, logger: logger) + .and_return(client) + end + + describe '#send_stats' do + subject(:send_stats) { transport.send_stats(payload) } + + let(:payload) do + { + 'Service' => 'test-service', + 'TracerVersion' => '1.0.0', + 'Lang' => 'ruby', + 'Stats' => [ + { + 'Start' => 1000000000, + 'Duration' => 10000000000, + 'Stats' => [], + 'Backlogs' => [] + } + ] + } + end + + let(:response) { instance_double(Datadog::Core::Transport::HTTP::Response, ok?: true) } + + before do + allow(client).to receive(:send_stats_payload).and_return(response) + end + + it 'encodes payload with MessagePack' do + expect(MessagePack).to receive(:pack).with(payload).and_call_original + send_stats + end + + it 'compresses the MessagePack data with gzip' do + expect(Zlib).to receive(:gzip).and_call_original + send_stats + end + + it 'sends the compressed data via client' do + expect(client).to receive(:send_stats_payload) do |request| + expect(request).to be_a(Datadog::DataStreams::Transport::Stats::Request) + expect(request.parcel).to be_a(Datadog::DataStreams::Transport::Stats::EncodedParcel) + + # Verify the data is compressed MessagePack + compressed_data = request.parcel.data + decompressed = Zlib.gunzip(compressed_data) + unpacked = MessagePack.unpack(decompressed) + expect(unpacked).to eq(payload) + + response + end + + send_stats + end + + it 'returns the response' do + expect(send_stats).to eq(response) + end + end + end +end diff --git a/spec/datadog/tracing/contrib/kafka/data_streams_spec.rb b/spec/datadog/tracing/contrib/kafka/data_streams_spec.rb new file mode 100644 index 00000000000..5c41900bfad --- /dev/null +++ b/spec/datadog/tracing/contrib/kafka/data_streams_spec.rb @@ -0,0 +1,227 @@ +# frozen_string_literal: true + +require 'datadog/tracing/contrib/support/spec_helper' +require 'datadog/core' +require 'datadog/core/ddsketch' +require 'datadog/data_streams/spec_helper' +require 'ostruct' +require 'datadog/tracing/contrib/kafka/integration' +require 'datadog/tracing/contrib/kafka/instrumentation/producer' +require 'datadog/tracing/contrib/kafka/instrumentation/consumer' + +RSpec.describe 'Kafka Data Streams instrumentation' do + let(:configuration_options) { {} } + + before do + Datadog.configure do |c| + c.tracing.instrument :kafka, configuration_options + c.data_streams.enabled = true + end + end + + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:kafka].reset_configuration! + example.run + Datadog.registry[:kafka].reset_configuration! + end + + describe 'pathway context' do + before do + skip_if_data_streams_not_supported(self) + end + + let(:test_producer_class) do + Class.new do + attr_accessor :pending_message_queue + + def initialize + @pending_message_queue = [] + end + + def deliver_messages(**kwargs) + # Mimic ruby-kafka behavior: operate on internal queue (no modification needed here) + result = {delivered_count: @pending_message_queue.size} + @pending_message_queue.clear + result + end + + prepend Datadog::Tracing::Contrib::Kafka::Instrumentation::Producer + end + end + + let(:producer) { test_producer_class.new } + let(:message) { OpenStruct.new(topic: 'test_topic', value: 'test_value', headers: {}) } + + it 'automatically injects pathway context when producing messages' do + # Test that the instrumentation automatically injects DSM headers + producer.pending_message_queue << message + producer.deliver_messages + + # Verify the header was automatically set by instrumentation + encoded_ctx = message.headers['dd-pathway-ctx-base64'] + expect(encoded_ctx).to be_a(String) + expect(encoded_ctx).not_to be_empty + + # Decode and verify it's a valid pathway context + decoded_ctx = Datadog::DataStreams::PathwayContext.decode_b64(encoded_ctx) + expect(decoded_ctx).to be_a(Datadog::DataStreams::PathwayContext) + expect(decoded_ctx.hash).to be > 0 # Should have a deterministic hash + expect(decoded_ctx.pathway_start).not_to be_nil + expect(decoded_ctx.pathway_start).to be_within(5).of(Time.now) # Should be recent + expect(decoded_ctx.current_edge_start).not_to be_nil + expect(decoded_ctx.current_edge_start).to be_within(5).of(Time.now) # Should be recent + end + end + + describe 'checkpointing' do + before do + skip_if_data_streams_not_supported(self) + end + + let(:test_producer_class) do + Class.new do + attr_accessor :pending_message_queue + + def initialize + @pending_message_queue = [] + end + + def deliver_messages(**kwargs) + result = {delivered_count: @pending_message_queue.size} + @pending_message_queue.clear + result + end + + prepend Datadog::Tracing::Contrib::Kafka::Instrumentation::Producer + end + end + + let(:test_consumer_class) do + Class.new do + attr_accessor :test_message + + def each_message(**kwargs) + # Yield the test message set by the test + yield(@test_message) if @test_message && block_given? + end + + prepend Datadog::Tracing::Contrib::Kafka::Instrumentation::Consumer + end + end + + let(:consumer) { test_consumer_class.new } + + it 'automatically processes pathway context when consuming messages' do + # Simulate a complete produce → consume flow to test auto-instrumentation + processor = Datadog::DataStreams.send(:processor) + + # Step 1: Produce a message (instrumentation automatically adds pathway context) + producer_message = OpenStruct.new(topic: 'test_topic', value: 'test', headers: {}) + test_producer = test_producer_class.new + test_producer.pending_message_queue << producer_message + test_producer.deliver_messages + + # Capture the producer pathway context + producer_ctx_b64 = producer_message.headers['dd-pathway-ctx-base64'] + producer_ctx = Datadog::DataStreams::PathwayContext.decode_b64(producer_ctx_b64) + + # Step 2: Consume the message (instrumentation automatically processes pathway context) + consumer_message = OpenStruct.new( + topic: 'test_topic', + partition: 0, + offset: 100, + headers: {'dd-pathway-ctx-base64' => producer_ctx_b64} + ) + + # Set the message for the consumer to yield + consumer.test_message = consumer_message + + # Process the message - instrumentation should automatically call set_consume_checkpoint + consumer.each_message do |msg| + # By the time this block runs, the instrumentation has already: + # 1. Extracted the pathway context from message headers + # 2. Called set_consume_checkpoint + # 3. Updated the processor's internal pathway context + + # Verify the message still has the producer's pathway context in headers + expect(msg.headers['dd-pathway-ctx-base64']).to eq(producer_ctx_b64) + expect(msg.topic).to eq('test_topic') + + # Verify the processor has updated its context after processing this message + current_ctx = processor.instance_variable_get(:@pathway_context) + expect(current_ctx).to be_a(Datadog::DataStreams::PathwayContext) + expect(current_ctx.hash).to be > 0 + expect(current_ctx.hash).not_to eq(producer_ctx.hash) # Consumer hash should differ (direction:in vs direction:out) + expect(current_ctx.pathway_start).to be_within(0.001).of(producer_ctx.pathway_start) # Should preserve pathway start time (within 1ms due to serialization precision loss) + end + end + end + + describe 'when DSM is disabled' do + before do + Datadog.configure do |c| + c.tracing.instrument :kafka + c.data_streams.enabled = false + end + end + + let(:test_producer_class) do + Class.new do + attr_accessor :pending_message_queue + + def initialize + @pending_message_queue = [] + end + + def deliver_messages(**kwargs) + result = {delivered_count: @pending_message_queue.size} + @pending_message_queue.clear + result + end + + prepend Datadog::Tracing::Contrib::Kafka::Instrumentation::Producer + end + end + + let(:test_consumer_class) do + Class.new do + attr_accessor :test_message + + def each_message(**kwargs) + yield(@test_message) if @test_message && block_given? + end + + prepend Datadog::Tracing::Contrib::Kafka::Instrumentation::Consumer + end + end + + it 'producer does not inject DSM headers when disabled' do + producer = test_producer_class.new + message = OpenStruct.new(topic: 'test_topic', value: 'test', headers: {}) + + producer.pending_message_queue << message + producer.deliver_messages + + # Should not have added DSM header + expect(message.headers).not_to include('dd-pathway-ctx-base64') + end + + it 'consumer does not process DSM headers when disabled' do + consumer = test_consumer_class.new + message = OpenStruct.new( + topic: 'test_topic', + partition: 0, + offset: 100, + headers: {'dd-pathway-ctx-base64' => 'some-context'} + ) + + consumer.test_message = message + + # Should not raise error even though DSM is disabled + expect { + consumer.each_message { |msg| expect(msg).to eq(message) } + }.not_to raise_error + end + end +end diff --git a/spec/datadog/tracing/contrib/karafka/data_streams_spec.rb b/spec/datadog/tracing/contrib/karafka/data_streams_spec.rb new file mode 100644 index 00000000000..5d4db387dde --- /dev/null +++ b/spec/datadog/tracing/contrib/karafka/data_streams_spec.rb @@ -0,0 +1,198 @@ +# frozen_string_literal: true + +require 'datadog/tracing/contrib/support/spec_helper' +require 'datadog/core' +require 'datadog/core/ddsketch' +require 'datadog/data_streams/spec_helper' +require 'karafka' +require 'ostruct' + +RSpec.describe 'Karafka Data Streams Integration' do + # Helper to create Karafka Messages using the real API + def build_karafka_messages(messages_data, topic_name = 'test_topic', partition = 0) + # Mock the topic with required methods (API changed between Karafka versions) + deserializer_mock = double('deserializer') + allow(deserializer_mock).to receive(:payload).and_return(double(call: nil)) + allow(deserializer_mock).to receive(:key).and_return(double(call: nil)) + allow(deserializer_mock).to receive(:headers).and_return(double(call: nil)) + + topic = double('Karafka::Routing::Topic', + name: topic_name, + deserializer: deserializer_mock, # Karafka 2.3.0 (singular) + deserializers: deserializer_mock, # Karafka 2.5+ (plural) + consumer_group: double(id: 'test_group')) + + raw_messages = messages_data.map do |data| + # Create metadata double + metadata = double('metadata') + allow(metadata).to receive(:partition).and_return(data[:partition] || partition) + allow(metadata).to receive(:offset).and_return(data[:offset] || 100) + allow(metadata).to receive(:headers).and_return(data[:headers] || {}) + allow(metadata).to receive(:raw_headers).and_return(data[:headers] || {}) + allow(metadata).to receive(:respond_to?).with(:raw_headers).and_return(true) + + # Create message double + msg = double('Karafka::Messages::Message') + allow(msg).to receive(:topic).and_return(data[:topic] || topic_name) + allow(msg).to receive(:partition).and_return(data[:partition] || partition) + allow(msg).to receive(:offset).and_return(data[:offset] || 100) + allow(msg).to receive(:headers).and_return(data[:headers] || {}) + allow(msg).to receive(:key).and_return(nil) + allow(msg).to receive(:payload).and_return(nil) + allow(msg).to receive(:timestamp).and_return(Time.now) + allow(msg).to receive(:metadata).and_return(metadata) + msg + end + + ::Karafka::Messages::Builders::Messages.call(raw_messages, topic, partition, Time.now) + end + + before do + Datadog.configure do |c| + c.tracing.instrument :karafka + c.data_streams.enabled = true + end + end + + after do + Datadog::DataStreams.send(:processor)&.stop(true) + end + + describe 'auto-instrumentation' do + before do + skip_if_data_streams_not_supported(self) + end + + it 'automatically extracts and processes pathway context when consuming messages' do + processor = Datadog::DataStreams.send(:processor) + + # Producer creates pathway context (simulating message from another service) + producer_ctx_b64 = processor.set_produce_checkpoint(type: 'kafka', destination: 'orders') + producer_ctx = Datadog::DataStreams::PathwayContext.decode_b64(producer_ctx_b64) + + # Create Karafka message with the pathway context in headers + messages = build_karafka_messages([ + {topic: 'orders', partition: 0, offset: 100, headers: {'dd-pathway-ctx-base64' => producer_ctx_b64}} + ], 'orders') + + # When we call .each, auto-instrumentation automatically: + # 1. Extracts pathway context from headers + # 2. Calls set_consume_checkpoint + # 3. Updates the processor's pathway context + messages.each do |message| + # Verify message has the pathway context + expect(message.headers['dd-pathway-ctx-base64']).to eq(producer_ctx_b64) + + # Verify auto-instrumentation has processed it + current_ctx = processor.instance_variable_get(:@pathway_context) + expect(current_ctx).to be_a(Datadog::DataStreams::PathwayContext) + expect(current_ctx.hash).not_to eq(producer_ctx.hash) # Consume checkpoint has different hash + expect(current_ctx.pathway_start).to be_within(0.001).of(producer_ctx.pathway_start) # Same pathway (within 1ms due to serialization precision loss) + end + end + + it 'creates new pathway context when headers are missing' do + processor = Datadog::DataStreams.send(:processor) + + messages = build_karafka_messages([ + {topic: 'orders', partition: 0, offset: 100, headers: {}} + ], 'orders') + + # Auto-instrumentation should still create a consume checkpoint even without headers + messages.each { |_message| } + + new_ctx = processor.instance_variable_get(:@pathway_context) + expect(new_ctx).to be_a(Datadog::DataStreams::PathwayContext) + expect(new_ctx.hash).to be > 0 + end + + it 'processes multiple messages in a batch' do + processor = Datadog::DataStreams.send(:processor) + + messages = build_karafka_messages([ + {topic: 'orders', partition: 0, offset: 100}, + {topic: 'orders', partition: 0, offset: 101}, + {topic: 'orders', partition: 0, offset: 102} + ], 'orders') + + message_count = 0 + expect { + messages.each do |_message| + message_count += 1 + # Each message gets auto-instrumentation + end + }.not_to raise_error + + expect(message_count).to eq(3) + expect(processor.pathway_context.hash).to be > 0 + end + end + + describe 'pathway propagation across services' do + before do + skip_if_data_streams_not_supported(self) + end + + it 'maintains pathway continuity through produce → consume → produce chain' do + processor = Datadog::DataStreams.send(:processor) + + # Service A: Producer creates initial pathway + ctx_a_b64 = processor.set_produce_checkpoint(type: 'kafka', destination: 'orders-topic') + ctx_a = Datadog::DataStreams::PathwayContext.decode_b64(ctx_a_b64) + + # Service B: Consumes from Service A (auto-instrumentation processes it) + messages_from_a = build_karafka_messages([ + {topic: 'orders-topic', partition: 0, offset: 100, headers: {'dd-pathway-ctx-base64' => ctx_a_b64}} + ], 'orders-topic') + + messages_from_a.each { |_msg| } # Auto-instrumentation runs here + + ctx_b_consume = processor.instance_variable_get(:@pathway_context) + expect(ctx_b_consume.hash).not_to eq(ctx_a.hash) # Consume creates new checkpoint + expect(ctx_b_consume.pathway_start).to be_within(0.001).of(ctx_a.pathway_start) # Same pathway (within 1ms due to serialization precision loss) + + # Service B: Produces to next topic + ctx_b_produce_b64 = processor.set_produce_checkpoint(type: 'kafka', destination: 'processed-orders') + ctx_b_produce = Datadog::DataStreams::PathwayContext.decode_b64(ctx_b_produce_b64) + + # Verify it's still the same pathway (within 1ms due to serialization precision loss) + expect(ctx_b_produce.pathway_start).to be_within(0.001).of(ctx_a.pathway_start) + + # Service C: Consumes from Service B (auto-instrumentation processes it) + messages_from_b = build_karafka_messages([ + {topic: 'processed-orders', partition: 0, offset: 200, headers: {'dd-pathway-ctx-base64' => ctx_b_produce_b64}} + ], 'processed-orders') + + messages_from_b.each { |_msg| } # Auto-instrumentation runs here + + ctx_c = processor.instance_variable_get(:@pathway_context) + expect(ctx_c).to be_a(Datadog::DataStreams::PathwayContext) + expect(ctx_c.hash).to be > 0 + expect(ctx_c.pathway_start).to be_within(0.001).of(ctx_a.pathway_start) # Still same original pathway (within 1ms due to serialization precision loss) + + # Verify pathway progressed through all services + # At minimum, consume from A should create different hash than initial produce + expect(ctx_b_consume.hash).not_to eq(ctx_a.hash) + end + end + + describe 'when DSM is disabled' do + before do + Datadog.configure do |c| + c.tracing.instrument :karafka + c.data_streams.enabled = false + end + end + + it 'skips DSM processing' do + messages = build_karafka_messages([ + {topic: 'orders', partition: 0, offset: 100, headers: {'dd-pathway-ctx-base64' => 'some-context'}} + ], 'orders') + + # Should not raise error even though DSM is disabled + expect { + messages.each { |_message| } + }.not_to raise_error + end + end +end diff --git a/spec/loading_spec.rb b/spec/loading_spec.rb index 2ea8c4e28f0..1bcba89507b 100644 --- a/spec/loading_spec.rb +++ b/spec/loading_spec.rb @@ -5,6 +5,7 @@ {require: 'datadog', check: 'Datadog::Core'}, {require: 'datadog/appsec', check: 'Datadog::AppSec'}, {require: 'datadog/core', check: 'Datadog::Core'}, + {require: 'datadog/data_streams', check: 'Datadog::DataStreams'}, {require: 'datadog/error_tracking', check: 'Datadog::ErrorTracking'}, {require: 'datadog/di', check: 'Datadog::DI', env: {DD_DYNAMIC_INSTRUMENTATION_ENABLED: 'false'}, diff --git a/supported-configurations.json b/supported-configurations.json index 6a75b00cd55..a52d162b18b 100644 --- a/supported-configurations.json +++ b/supported-configurations.json @@ -79,6 +79,9 @@ "DD_CRASHTRACKING_ENABLED": { "version": ["A"] }, + "DD_DATA_STREAMS_ENABLED": { + "version": ["A"] + }, "DD_DBM_PROPAGATION_MODE": { "version": ["A"] },