Skip to content

Commit a307db2

Browse files
committed
ES|QL support: ESQL executor implementation, response type to accept esql option, validations to make sure both LS and ES support the ESQL execution.
1 parent d9bf375 commit a307db2

File tree

6 files changed

+334
-6
lines changed

6 files changed

+334
-6
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.2.0
2+
- ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233)
3+
14
## 5.1.0
25
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)
36

lib/logstash/inputs/elasticsearch.rb

+49-5
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7474
require 'logstash/inputs/elasticsearch/paginated_search'
7575
require 'logstash/inputs/elasticsearch/aggregation'
7676
require 'logstash/inputs/elasticsearch/cursor_tracker'
77+
require 'logstash/inputs/elasticsearch/esql'
7778

7879
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
7980
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -104,7 +105,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
104105
# This allows you to speccify the response type: either hits or aggregations
105106
# where hits: normal search request
106107
# aggregations: aggregation request
107-
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
108+
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits'
108109

109110
# This allows you to set the maximum number of hits returned per scroll.
110111
config :size, :validate => :number, :default => 1000
@@ -302,10 +303,17 @@ def register
302303
fill_hosts_from_cloud_id
303304
setup_ssl_params!
304305

305-
@base_query = LogStash::Json.load(@query)
306-
if @slices
307-
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
308-
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
306+
if @response_type == 'esql'
307+
validate_ls_version_for_esql_support!
308+
validate_esql_query!
309+
inform_ineffective_esql_params
310+
else
311+
# for the ES|QL, plugin accepts raw string query but JSON for others
312+
@base_query = LogStash::Json.load(@query)
313+
if @slices
314+
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
315+
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
316+
end
309317
end
310318

311319
@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
@@ -341,6 +349,9 @@ def register
341349

342350
test_connection!
343351

352+
# make sure connected ES supports ES|QL (8.11+)
353+
validate_es_for_esql_support! if @response_type == 'esql'
354+
344355
setup_serverless
345356

346357
setup_search_api
@@ -398,6 +409,12 @@ def event_from_hit(hit, root_field)
398409
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
399410
end
400411

412+
def decorate_and_push_to_queue(output_queue, mapped_entry)
413+
event = targeted_event_factory.new_event mapped_entry
414+
decorate(event)
415+
output_queue << event
416+
end
417+
401418
def set_docinfo_fields(hit, event)
402419
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
403420
docinfo_target = event.get(@docinfo_target) || {}
@@ -675,6 +692,8 @@ def setup_query_executor
675692
end
676693
when 'aggregations'
677694
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
695+
when 'esql'
696+
LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
678697
end
679698
end
680699

@@ -714,6 +733,31 @@ def get_transport_client_class
714733
::Elastic::Transport::Transport::HTTP::Manticore
715734
end
716735

736+
def validate_ls_version_for_esql_support!
737+
# LS 8.17.4+ has elasticsearch-ruby 8.17 client
738+
# elasticsearch-ruby 8.11+ supports ES|QL
739+
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create("8.17.4")
740+
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4")
741+
end
742+
end
743+
744+
def validate_esql_query!
745+
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
746+
source_commands = %w[FROM ROW SHOW]
747+
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
748+
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
749+
end
750+
751+
def inform_ineffective_esql_params
752+
ineffective_options = original_params.keys & %w(target size slices search_api)
753+
@logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1
754+
end
755+
756+
def validate_es_for_esql_support!
757+
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create("8.11")
758+
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. Please upgrade it.") unless es_supports_esql
759+
end
760+
717761
module URIOrEmptyValidator
718762
##
719763
# @override to provide :uri_or_empty validator
+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
require 'logstash/helpers/loggable_try'
2+
3+
module LogStash
4+
module Inputs
5+
class Elasticsearch
6+
class Esql
7+
include LogStash::Util::Loggable
8+
9+
ESQL_JOB = "ES|QL job"
10+
11+
# Initialize the ESQL query executor
12+
# @param client [Elasticsearch::Client] The Elasticsearch client instance
13+
# @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance
14+
def initialize(client, plugin)
15+
@client = client
16+
@plugin_params = plugin.params
17+
@plugin = plugin
18+
@retries = @plugin_params["retries"]
19+
@query = @plugin_params["query"]
20+
end
21+
22+
# Execute the ESQL query and process results
23+
# @param output_queue [Queue] The queue to push processed events to
24+
def do_run(output_queue)
25+
logger.info("ES|QL executor starting")
26+
response = retryable(ESQL_JOB) do
27+
@client.esql.query({ body: { query: @query }, format: 'json' })
28+
end
29+
# retriable already printed error details
30+
return if response == false
31+
32+
if response&.headers&.dig("warning")
33+
logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]})
34+
end
35+
if response['values'] && response['columns']
36+
process_response(response['values'], response['columns'], output_queue)
37+
end
38+
end
39+
40+
# Execute a retryable operation with proper error handling
41+
# @param job_name [String] Name of the job for logging purposes
42+
# @yield The block to execute
43+
# @return [Boolean] true if successful, false otherwise
44+
def retryable(job_name, &block)
45+
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
46+
stud_try.try((@retries + 1).times) { yield }
47+
rescue => e
48+
error_details = {:message => e.message, :cause => e.cause}
49+
error_details[:backtrace] = e.backtrace if logger.debug?
50+
logger.error("#{job_name} failed with ", error_details)
51+
false
52+
end
53+
54+
private
55+
56+
# Process the ESQL response and push events to the output queue
57+
# @param values [Array[Array]] The ESQL query response hits
58+
# @param columns [Array[Hash]] The ESQL query response columns
59+
# @param output_queue [Queue] The queue to push processed events to
60+
def process_response(values, columns, output_queue)
61+
values.each do |value|
62+
mapped_data = map_column_and_values(columns, value)
63+
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
64+
end
65+
end
66+
67+
# Map column names to their corresponding values
68+
# @param columns [Array] Array of column definitions
69+
# @param values [Array] Array of values for the current row
70+
# @return [Hash] Mapped data with column names as keys
71+
def map_column_and_values(columns, values)
72+
columns.each_with_index.with_object({}) do |(column, index), mapped_data|
73+
mapped_data[column["name"]] = values[index]
74+
end
75+
end
76+
end
77+
end
78+
end
79+
end

logstash-input-elasticsearch.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-elasticsearch'
4-
s.version = '5.1.0'
4+
s.version = '5.2.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads query results from an Elasticsearch cluster"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# encoding: utf-8
2+
require "logstash/devutils/rspec/spec_helper"
3+
require "logstash/inputs/elasticsearch"
4+
require "elasticsearch"
5+
6+
describe LogStash::Inputs::Elasticsearch::Esql do
7+
let(:client) { instance_double(Elasticsearch::Client) }
8+
let(:esql_client) { double("esql-client") }
9+
let(:plugin) { instance_double(LogStash::Inputs::Elasticsearch, params: plugin_config) }
10+
let(:plugin_config) do
11+
{
12+
"query" => "FROM test-index | STATS count() BY field",
13+
"retries" => 3
14+
}
15+
end
16+
let(:esql_executor) { described_class.new(client, plugin) }
17+
18+
describe "when initializes" do
19+
it "sets up the ESQL client with correct parameters" do
20+
expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"])
21+
expect(esql_executor.instance_variable_get(:@retries)).to eq(plugin_config["retries"])
22+
end
23+
end
24+
25+
describe "when faces error while retrying" do
26+
it "retries the given block the specified number of times" do
27+
attempts = 0
28+
result = esql_executor.retryable("Test Job") do
29+
attempts += 1
30+
raise StandardError if attempts < 3
31+
"success"
32+
end
33+
expect(attempts).to eq(3)
34+
expect(result).to eq("success")
35+
end
36+
37+
it "returns false if the block fails all attempts" do
38+
result = esql_executor.retryable("Test Job") do
39+
raise StandardError
40+
end
41+
expect(result).to eq(false)
42+
end
43+
end
44+
45+
describe "when executing chain of processes" do
46+
let(:output_queue) { Queue.new }
47+
let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'id'}, { 'name' => 'val'}] } }
48+
49+
before do
50+
allow(esql_executor).to receive(:retryable).and_yield
51+
allow(client).to receive_message_chain(:esql, :query).and_return(response)
52+
allow(plugin).to receive(:decorate_and_push_to_queue)
53+
end
54+
55+
it "executes the ESQL query and processes the results" do
56+
allow(response).to receive(:headers).and_return({})
57+
esql_executor.do_run(output_queue)
58+
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'})
59+
end
60+
61+
it "logs a warning if the response contains a warning header" do
62+
allow(response).to receive(:headers).and_return({"warning" => "some warning"})
63+
expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", {:message => "some warning"})
64+
esql_executor.do_run(output_queue)
65+
end
66+
67+
it "does not log a warning if the response does not contain a warning header" do
68+
allow(response).to receive(:headers).and_return({})
69+
expect(esql_executor.logger).not_to receive(:warn)
70+
esql_executor.do_run(output_queue)
71+
end
72+
end
73+
74+
75+
describe "when starts processing the response" do
76+
let(:output_queue) { Queue.new }
77+
let(:values) { [%w[foo bar]] }
78+
let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] }
79+
80+
it "processes the ESQL response and pushes events to the output queue" do
81+
allow(plugin).to receive(:decorate_and_push_to_queue)
82+
esql_executor.send(:process_response, values, columns, output_queue)
83+
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'})
84+
end
85+
end
86+
87+
describe "when maps column and values" do
88+
let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] }
89+
let(:values) { %w[foo bar] }
90+
91+
it "maps column names to their corresponding values" do
92+
result = esql_executor.send(:map_column_and_values, columns, values)
93+
expect(result).to eq({'id' => 'foo', 'val' => 'bar'})
94+
end
95+
end
96+
end

0 commit comments

Comments
 (0)