Skip to content

Commit

Permalink
Merge pull request #48 from DataDog/remeh/v2-routes
Browse files Browse the repository at this point in the history
Support sending logs to Datadog v2 endpoints.
  • Loading branch information
remeh authored Oct 18, 2021
2 parents b6f8bf7 + 49f8e4e commit 9e52dd3
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 17 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ foo/
*.iml
.idea/

fluent/
Gemfile.lock
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.14.0
- Support Datadog v2 endpoints [#48](https://github.com/DataDog/fluent-plugin-datadog/pull/48)

## 0.13.0
- Support HTTP proxies [#46](https://github.com/DataDog/fluent-plugin-datadog/pull/46)

Expand Down
8 changes: 5 additions & 3 deletions fluent-plugin-datadog.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

require "fluent/plugin/version.rb"

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-datadog"
spec.version = "0.13.0"
spec.version = DatadogFluentPlugin::VERSION
spec.authors = ["Datadog Solutions Team"]
spec.email = ["[email protected]"]
spec.summary = "Datadog output plugin for Fluent event collector"
spec.homepage = "http://datadoghq.com"
spec.license = "Apache-2.0"

spec.files = [".gitignore", "Gemfile", "LICENSE", "README.md", "Rakefile", "fluent-plugin-datadog.gemspec", "lib/fluent/plugin/out_datadog.rb"]
spec.files = [".gitignore", "Gemfile", "LICENSE", "README.md", "Rakefile", "fluent-plugin-datadog.gemspec", "lib/fluent/plugin/version.rb", "lib/fluent/plugin/out_datadog.rb"]
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]
Expand All @@ -28,5 +30,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "test-unit", '~> 3.1'
spec.add_development_dependency "rake", "~> 12.0"
spec.add_development_dependency "yajl-ruby", "~> 1.2"
spec.add_development_dependency 'webmock', "~> 3.5.0"
spec.add_development_dependency 'webmock', "~> 3.6.0"
end
27 changes: 20 additions & 7 deletions lib/fluent/plugin/out_datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
require "zlib"
require "fluent/plugin/output"

require_relative "version"

class Fluent::DatadogOutput < Fluent::Plugin::Output
class RetryableError < StandardError;
end
Expand Down Expand Up @@ -50,6 +52,7 @@ class RetryableError < StandardError;
config_param :compression_level, :integer, :default => 6
config_param :no_ssl_validation, :bool, :default => false
config_param :http_proxy, :string, :default => nil
config_param :force_v1_routes, :bool, :default => false

# Format settings
config_param :use_json, :bool, :default => true
Expand Down Expand Up @@ -89,7 +92,7 @@ def formatted_to_msgpack_binary?

def start
super
@client = new_client(log, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @ssl_port, @port, @http_proxy, @use_compression)
@client = new_client(log, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @ssl_port, @port, @http_proxy, @use_compression, @force_v1_routes)
end

def shutdown
Expand Down Expand Up @@ -261,9 +264,9 @@ def gzip_compress(payload, compression_level)
end

# Build a new transport client
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression)
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, force_v1_routes)
if use_http
DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key
DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key, force_v1_routes
else
DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port
end
Expand Down Expand Up @@ -301,20 +304,29 @@ class DatadogHTTPClient < DatadogClient
require 'net/http'
require 'net/http/persistent'

def initialize(logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key)
def initialize(logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key, force_v1_routes = false)
@logger = logger
protocol = use_ssl ? "https" : "http"
port = use_ssl ? ssl_port : port
@uri = URI("#{protocol}://#{host}:#{port.to_s}/v1/input/#{api_key}")
if force_v1_routes
@uri = URI("#{protocol}://#{host}:#{port.to_s}/v1/input/#{api_key}")
else
@uri = URI("#{protocol}://#{host}:#{port.to_s}/api/v2/logs")
end
proxy_uri = :ENV
if http_proxy
proxy_uri = URI.parse(http_proxy)
elsif ENV['HTTP_PROXY'] || ENV['http_proxy']
logger.info("Using HTTP proxy defined in `HTTP_PROXY`/`http_proxy` env vars")
end
logger.info("Starting HTTP connection to #{protocol}://#{host}:#{port.to_s} with compression " + (use_compression ? "enabled" : "disabled"))
logger.info("Starting HTTP connection to #{protocol}://#{host}:#{port.to_s} with compression " + (use_compression ? "enabled" : "disabled") + (force_v1_routes ? " using v1 routes" : " using v2 routes"))
@client = Net::HTTP::Persistent.new name: "fluent-plugin-datadog-logcollector", proxy: proxy_uri
@client.verify_mode = OpenSSL::SSL::VERIFY_NONE if no_ssl_validation
unless force_v1_routes
@client.override_headers["DD-API-KEY"] = api_key
@client.override_headers["DD-EVP-ORIGIN"] = "fluent"
@client.override_headers["DD-EVP-ORIGIN-VERSION"] = DatadogFluentPlugin::VERSION
end
@client.override_headers["Content-Type"] = "application/json"
if use_compression
@client.override_headers["Content-Encoding"] = "gzip"
Expand All @@ -330,7 +342,8 @@ def send(payload)
request.body = payload
response = @client.request @uri, request
res_code = response.code.to_i
if res_code >= 500
# on a backend error or on an http 429, retry with backoff
if res_code >= 500 || res_code == 429
raise RetryableError.new "Unable to send payload: #{res_code} #{response.message}"
end
if res_code >= 400
Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/plugin/version.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# frozen_string_literal: true

module DatadogFluentPlugin
VERSION = '0.14.0'
end
81 changes: 75 additions & 6 deletions test/plugin/test_out_datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "fluent/test/helpers"
require "fluent/test/driver/output"
require "fluent/plugin/out_datadog"
require "fluent/plugin/version"
require 'webmock/test_unit'

class FluentDatadogTest < Test::Unit::TestCase
Expand Down Expand Up @@ -210,31 +211,82 @@ def create_valid_subject
end
end

sub_test_case "http connection errors" do
# v1 routes
sub_test_case "http connection errors (v1 routes)" do
test "should retry when server is returning 5XX" do
api_key = 'XXX'
stub_dd_request_with_return_code(api_key, 500)
payload = '{}'
client = Fluent::DatadogOutput::DatadogHTTPClient.new Logger.new(STDOUT), false, false, "datadog.com", 443, 80, nil, false, api_key
client = Fluent::DatadogOutput::DatadogHTTPClient.new Logger.new(STDOUT), false, false, "datadog.com", 443, 80, nil, false, api_key, true
assert_raise(Fluent::DatadogOutput::RetryableError) do
client.send(payload)
end
end

test "should not retry when server is returning 4XX" do
# note that in theory, v1 routes should not return a 429 but still
# we added this mechanism for v1 routes while implementing v2 ones
test "should retry when server is returning 429 (v1 routes)" do
api_key = 'XXX'
stub_dd_request_with_return_code(api_key, 429)
payload = '{}'
client = Fluent::DatadogOutput::DatadogHTTPClient.new Logger.new(STDOUT), false, false, "datadog.com", 443, 80, nil, false, api_key, true
assert_raise(Fluent::DatadogOutput::RetryableError) do
client.send(payload)
end
end

test "should not retry when server is returning 4XX (v1 routes)" do
api_key = 'XXX'
stub_dd_request_with_return_code(api_key, 400)
payload = '{}'
client = Fluent::DatadogOutput::DatadogHTTPClient.new Logger.new(STDOUT), false, false, "datadog.com", 443, 80, nil, false, api_key, true
assert_nothing_raised do
client.send(payload)
end
end
end

# v2 routes
sub_test_case "http connection errors (v2 routes)" do
test "should retry when server is returning 5XX" do
api_key = 'XXX'
stub_dd_request_with_return_code(api_key, 500, true)
payload = '{}'
client = Fluent::DatadogOutput::DatadogHTTPClient.new Logger.new(STDOUT), false, false, "datadog.com", 443, 80, nil, false, api_key
assert_raise(Fluent::DatadogOutput::RetryableError) do
client.send(payload)
end
end

test "should retry when server is returning 429 (v2 routes)" do
api_key = 'XXX'
stub_dd_request_with_return_code(api_key, 429, true)
payload = '{}'
client = Fluent::DatadogOutput::DatadogHTTPClient.new Logger.new(STDOUT), false, false, "datadog.com", 443, 80, nil, false, api_key
assert_raise(Fluent::DatadogOutput::RetryableError) do
client.send(payload)
end
end

test "should not retry when server is returning 4XX (v2 routes)" do
api_key = 'XXX'
stub_dd_request_with_return_code(api_key, 400, true)
payload = '{}'
client = Fluent::DatadogOutput::DatadogHTTPClient.new Logger.new(STDOUT), false, false, "datadog.com", 443, 80, nil, false, api_key
assert_nothing_raised do
client.send(payload)
end
end
end

def stub_dd_request_with_return_code(api_key, return_code)
stub_dd_request(api_key).
to_return(status: return_code, body: "", headers: {})
def stub_dd_request_with_return_code(api_key, return_code, v2_routes = false)
if v2_routes
stub_dd_request_v2_routes(api_key).
to_return(status: return_code, body: "", headers: {})
else
stub_dd_request(api_key).
to_return(status: return_code, body: "", headers: {})
end
end

def stub_dd_request_with_error(api_key, error)
Expand All @@ -255,4 +307,21 @@ def stub_dd_request(api_key)
'User-Agent' => 'Ruby'
})
end

def stub_dd_request_v2_routes(api_key)
stub_request(:post, "http://datadog.com/api/v2/logs").
with(
body: "{}",
headers: {
'Accept'=>'*/*',
'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3',
'Connection'=>'keep-alive',
'Content-Type'=>'application/json',
'Dd-Api-Key'=> "#{api_key}",
'Dd-Evp-Origin'=>'fluent',
'Dd-Evp-Origin-Version'=> "#{DatadogFluentPlugin::VERSION}",
'Keep-Alive'=>'30',
'User-Agent'=>'Ruby'
})
end
end

0 comments on commit 9e52dd3

Please sign in to comment.