Skip to content

Commit 895d80e

Browse files
authored
DEBUG-3558 DI: chunk snapshot payloads (#5086)
1 parent f88393d commit 895d80e

File tree

7 files changed

+192
-9
lines changed

7 files changed

+192
-9
lines changed

lib/datadog/core/utils/array.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# frozen_string_literal: true
2+
3+
module Datadog
4+
module Core
5+
module Utils
6+
# Common array-related utility functions.
7+
module Array
8+
def self.filter_map(array, &block)
9+
if array.respond_to?(:filter_map)
10+
# DEV Supported since Ruby 2.7, saves an intermediate object creation
11+
array.filter_map(&block)
12+
elsif array.is_a?(Enumerator::Lazy)
13+
# You would think that .compact would work here, but it does not:
14+
# the result of .map could be an Enumerator::Lazy instance which
15+
# does not implement #compact on Ruby 2.5/2.6.
16+
array.map(&block).reject do |item|
17+
item.nil?
18+
end
19+
else
20+
array.each_with_object([]) do |item, memo|
21+
new_item = block.call(item)
22+
memo.push(new_item) unless new_item.nil?
23+
end
24+
end
25+
end
26+
end
27+
end
28+
end
29+
end

lib/datadog/di/transport/http.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@ def input(
4040
api_version: nil,
4141
headers: nil
4242
)
43-
Core::Transport::HTTP.build(api_instance_class: Input::API::Instance,
43+
Core::Transport::HTTP.build(
44+
api_instance_class: Input::API::Instance,
4445
logger: logger,
45-
agent_settings: agent_settings, api_version: api_version, headers: headers) do |transport|
46+
agent_settings: agent_settings,
47+
api_version: api_version,
48+
headers: headers,
49+
) do |transport|
4650
apis = API.defaults
4751

4852
transport.api API::INPUT, apis[API::INPUT]

lib/datadog/di/transport/input.rb

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
# frozen_string_literal: true
22

3+
require_relative '../../core/chunker'
4+
require_relative '../../core/encoding'
5+
require_relative '../../core/tag_builder'
36
require_relative '../../core/transport/parcel'
7+
require_relative '../../core/transport/request'
8+
require_relative '../error'
49
require_relative 'http/input'
510

611
module Datadog
@@ -24,6 +29,25 @@ def initialize(parcel, serialized_tags)
2429
class Transport
2530
attr_reader :client, :apis, :default_api, :current_api_id, :logger
2631

32+
# The limit on an individual snapshot payload, aka "log line",
33+
# is 1 MB.
34+
#
35+
# TODO There is an RFC for snapshot pruning that should be
36+
# implemented to reduce the size of snapshots to be below this
37+
# limit, so that we can send a portion of the captured data
38+
# rather than dropping the snapshot entirely.
39+
MAX_SERIALIZED_SNAPSHOT_SIZE = 1024 * 1024
40+
41+
# The maximum chunk (batch) size that intake permits is 5 MB.
42+
#
43+
# Two bytes are for the [ and ] of JSON array syntax.
44+
MAX_CHUNK_SIZE = 5 * 1024 * 1024 - 2
45+
46+
# Try to send smaller payloads to avoid large network requests.
47+
# If a payload is larger than default chunk size but is under the
48+
# max chunk size, it will still get sent out.
49+
DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024
50+
2751
def initialize(apis, default_api, logger:)
2852
@apis = apis
2953
@logger = logger
@@ -36,9 +60,45 @@ def current_api
3660
end
3761

3862
def send_input(payload, tags)
39-
json = JSON.dump(payload)
40-
parcel = EncodedParcel.new(json)
63+
# Tags are the same for all chunks, serialize them one time.
4164
serialized_tags = Core::TagBuilder.serialize_tags(tags)
65+
66+
encoder = Core::Encoding::JSONEncoder
67+
encoded_snapshots = Core::Utils::Array.filter_map(payload) do |snapshot|
68+
encoded = encoder.encode(snapshot)
69+
if encoded.length > MAX_SERIALIZED_SNAPSHOT_SIZE
70+
# Drop the snapshot.
71+
# TODO report via telemetry metric?
72+
logger.debug { "di: dropping too big snapshot" }
73+
nil
74+
else
75+
encoded
76+
end
77+
end
78+
79+
Datadog::Core::Chunker.chunk_by_size(
80+
encoded_snapshots, DEFAULT_CHUNK_SIZE,
81+
).each do |chunk|
82+
# We drop snapshots that are too big earlier.
83+
# The limit on chunked payload length here is greater
84+
# than the limit on snapshot size, therefore no chunks
85+
# can exceed limits here.
86+
chunked_payload = encoder.join(chunk)
87+
88+
# We need to rescue exceptions for each chunk so that
89+
# subsequent chunks are attempted to be sent.
90+
begin
91+
send_input_chunk(chunked_payload, serialized_tags)
92+
rescue => exc
93+
logger.debug { "di: failed to send snapshot chunk: #{exc.class}: #{exc} (at #{exc.backtrace.first})" }
94+
end
95+
end
96+
97+
payload
98+
end
99+
100+
def send_input_chunk(chunked_payload, serialized_tags)
101+
parcel = EncodedParcel.new(chunked_payload)
42102
request = Request.new(parcel, serialized_tags)
43103

44104
response = @client.send_input_payload(request)

lib/datadog/tracing/transport/traces.rb

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require_relative '../../core/chunker'
44
require_relative '../../core/transport/parcel'
55
require_relative '../../core/transport/request'
6+
require_relative '../../core/utils/array'
67
require_relative 'serializable_trace'
78
require_relative 'trace_formatter'
89

@@ -65,11 +66,8 @@ def initialize(encoder, logger:, native_events_supported:, max_size: DEFAULT_MAX
6566
# @return [Enumerable[Array[Bytes,Integer]]] list of encoded chunks: each containing a byte array and
6667
# number of traces
6768
def encode_in_chunks(traces)
68-
encoded_traces = if traces.respond_to?(:filter_map)
69-
# DEV Supported since Ruby 2.7, saves an intermediate object creation
70-
traces.filter_map { |t| encode_one(t) }
71-
else
72-
traces.map { |t| encode_one(t) }.reject(&:nil?)
69+
encoded_traces = Core::Utils::Array.filter_map(traces) do |trace|
70+
encode_one(trace)
7371
end
7472

7573
Datadog::Core::Chunker.chunk_by_size(encoded_traces, max_size).map do |chunk|

sig/datadog/core/utils/array.rbs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module Datadog
2+
module Core
3+
module Utils
4+
module Array
5+
def self.filter_map: (::Array[any] array) { (any) -> any } -> ::Array[any]
6+
end
7+
end
8+
end
9+
end

sig/datadog/di/transport/input.rbs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,17 @@ module Datadog
3030
attr_reader current_api_id: untyped
3131

3232
attr_reader logger: untyped
33+
MAX_SERIALIZED_SNAPSHOT_SIZE: ::Integer
34+
MAX_CHUNK_SIZE: ::Integer
35+
DEFAULT_CHUNK_SIZE: ::Integer
3336

3437
def initialize: (untyped apis, untyped default_api, logger: untyped) -> void
3538

3639
def current_api: () -> untyped
3740

3841
def send_input: (untyped payload, untyped tags) -> untyped
42+
43+
def send_input_chunk: (untyped chunked_payload, untyped serialized_tags) -> untyped
3944
end
4045
end
4146
end
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
require "datadog/di/spec_helper"
2+
require 'datadog/di/transport/http'
3+
4+
RSpec.describe Datadog::DI::Transport::Input::Transport do
5+
di_test
6+
7+
let(:transport) do
8+
Datadog::DI::Transport::HTTP.input(agent_settings: agent_settings, logger: logger)
9+
end
10+
11+
let(:agent_settings) { Datadog::Core::Configuration::AgentSettingsResolver.call(settings, logger: nil) }
12+
13+
let(:settings) do
14+
Datadog::Core::Configuration::Settings.new
15+
end
16+
17+
let(:logger) do
18+
instance_double(Logger)
19+
end
20+
21+
let(:tags) { [] }
22+
23+
context 'when the combined size of snapshots serialized exceeds intake max' do
24+
before do
25+
# Reduce limits to make the test run faster and not require a lot of memory
26+
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 100_000)
27+
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 200_000)
28+
end
29+
30+
let(:snapshot) do
31+
# It doesn't matter what the payload is, generate a fake one here.
32+
# This payload serializes to 9781 bytes of JSON.
33+
1000.times.map do |i|
34+
[i, i]
35+
end.to_h
36+
end
37+
38+
let(:snapshots) do
39+
# This serializes to 978201 bytes of JSON - just under 1 MB.
40+
[snapshot] * 100
41+
end
42+
43+
it 'chunks snapshots' do
44+
# Just under 1 MB payload, default chunk size ~100 KB, we expect 10 chunks
45+
expect(transport).to receive(:send_input_chunk).exactly(10).times do |chunked_payload, serialized_tags|
46+
expect(chunked_payload.length).to be < 100_000
47+
expect(chunked_payload.length).to be > 90_000
48+
end
49+
transport.send_input(snapshots, tags)
50+
end
51+
52+
context 'when individual snapshot exceeds intake max' do
53+
before do
54+
# Reduce limits even more to force a reasonably-sized snapshot to be dropped
55+
stub_const('Datadog::DI::Transport::Input::Transport::MAX_SERIALIZED_SNAPSHOT_SIZE', 2_000)
56+
end
57+
58+
let(:small_snapshot) do
59+
20.times.map do |i|
60+
[i, i]
61+
end.to_h
62+
end
63+
64+
let(:snapshots) do
65+
[small_snapshot, snapshot]
66+
end
67+
68+
it 'drops snapshot that is too big' do
69+
expect(transport).to receive(:send_input_chunk).once do |chunked_payload, serialized_tags|
70+
expect(chunked_payload.length).to be < 1_000
71+
expect(chunked_payload.length).to be > 100
72+
end
73+
expect_lazy_log(logger, :debug, 'di: dropping too big snapshot')
74+
transport.send_input(snapshots, tags)
75+
end
76+
end
77+
end
78+
end

0 commit comments

Comments
 (0)