Skip to content

Commit

Permalink
sasl.oauthbearer.token.endpoint.url Kafka client setting to set oauth…
Browse files Browse the repository at this point in the history
…bearer token endpoint url
  • Loading branch information
roon-replica committed Sep 12, 2024
1 parent 7786c5e commit 56f8c92
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 0 deletions.
16 changes: 16 additions & 0 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
Expand Down Expand Up @@ -563,6 +565,20 @@ to a given topic partition. This avoids repeated fetching-and-failing in a tight

The SASL client callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class""]
===== `sasl_login_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL login callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url""]
===== `sasl_oauthbearer_token_endpoint_url`
* Value type is <<string,string>>
* There is no default value for this setting.

The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server for integration with OAuth 2.0 identity providers.

[id="plugins-{type}s-{plugin}-sasl_jaas_config"]
===== `sasl_jaas_config`

Expand Down
16 changes: 16 additions & 0 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-retries>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
Expand Down Expand Up @@ -399,6 +401,20 @@ The amount of time to wait before attempting to retry a failed produce request t

The SASL client callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class""]
===== `sasl_login_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL login callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url""]
===== `sasl_oauthbearer_token_endpoint_url`
* Value type is <<string,string>>
* There is no default value for this setting.

The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server for integration with OAuth 2.0 identity providers.

[id="plugins-{type}s-{plugin}-sasl_jaas_config"]
===== `sasl_jaas_config`

Expand Down
4 changes: 4 additions & 0 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
# SASL client callback handler class
config :sasl_client_callback_handler_class, :validate => :string
# SASL login callback handler class
config :sasl_login_callback_handler_class, :validate => :string
# The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server.
config :sasl_oauthbearer_token_endpoint_url, :validate => :string
# http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
# This may be any mechanism for which a security provider is available.
# GSSAPI is the default mechanism.
Expand Down
6 changes: 6 additions & 0 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
# SASL client callback handler class
config :sasl_client_callback_handler_class, :validate => :string
# SASL login callback handler class
config :sasl_login_callback_handler_class, :validate => :string
# The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server.
config :sasl_oauthbearer_token_endpoint_url, :validate => :string
# http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
# This may be any mechanism for which a security provider is available.
# GSSAPI is the default mechanism.
Expand Down Expand Up @@ -363,6 +367,8 @@ def create_producer
props.put(kafka::VALUE_SERIALIZER_CLASS_CONFIG, value_serializer)

props.put("security.protocol", security_protocol) unless security_protocol.nil?
props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil?
props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil?

if security_protocol == "SSL"
set_trustore_keystore_config(props)
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/plugin_mixins/kafka/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def set_sasl_config(props)
props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil?
props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil?
props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil?
end

def reassign_dns_lookup
Expand Down

0 comments on commit 56f8c92

Please sign in to comment.