diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc
index 5c37688..e4408f9 100644
--- a/docs/input-kafka.asciidoc
+++ b/docs/input-kafka.asciidoc
@@ -131,6 +131,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
 | <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms>> |<<number,number>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_read_timeout_ms>> |<<number,number>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms>> |<<number,number>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
@@ -556,13 +563,62 @@ retries are exhausted.
 The amount of time to wait before attempting to retry a failed fetch request
 to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.
 
-[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
+[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"]
 ===== `sasl_client_callback_handler_class`
-* Value type is <<string,string>>
-* There is no default value for this setting.
+  * Value type is <<string,string>>
+  * There is no default value for this setting.
 
 The SASL client callback handler class the specified SASL mechanism should use.
 
+[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"]
+===== `sasl_oauthbearer_token_endpoint_url`
+  * Value type is <<string,string>>
+  * There is no default value for this setting.
+
+The URL for the OAuth 2.0 issuer token endpoint.
+
+[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"]
+===== `sasl_oauthbearer_scope_claim_name`
+  * Value type is <<string,string>>
+  * Default value is `"scope"`
+
+(optional) The override name of the scope claim.
+
+[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"]
+===== `sasl_login_callback_handler_class`
+  * Value type is <<string,string>>
+  * There is no default value for this setting.
+
+The SASL login callback handler class the specified SASL mechanism should use.
+
+[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"]
+===== `sasl_login_connect_timeout_ms`
+  * Value type is <<number,number>>
+  * Default value is `10000` milliseconds.
+
+(optional) The duration, in milliseconds, for HTTPS connect timeout
+
+[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"]
+===== `sasl_login_read_timeout_ms`
+  * Value type is <<number,number>>
+  * Default value is `10000` milliseconds.
+
+(optional) The duration, in milliseconds, for HTTPS read timeout.
+
+[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"]
+===== `sasl_login_retry_backoff_ms`
+  * Value type is <<number,number>>
+  * Default value is `100` milliseconds.
+
+(optional) The duration, in milliseconds, to wait between HTTPS call attempts.
+
+[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"]
+===== `sasl_login_retry_backoff_max_ms`
+  * Value type is <<number,number>>
+  * Default value is `10000` milliseconds.
+
+(optional) The maximum duration, in milliseconds, for HTTPS call attempts.
+
 [id="plugins-{type}s-{plugin}-sasl_jaas_config"]
 ===== `sasl_jaas_config` 
 
diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc
index 716d3dd..0a1c53b 100644
--- a/docs/output-kafka.asciidoc
+++ b/docs/output-kafka.asciidoc
@@ -102,6 +102,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
 | <<plugins-{type}s-{plugin}-retries>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms>> |<<number,number>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_read_timeout_ms>> |<<number,number>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms>> |<<number,number>>|No
+| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
@@ -392,13 +399,62 @@ In versions prior to 10.5.0, any exception is retried indefinitely unless the `r
 
 The amount of time to wait before attempting to retry a failed produce request to a given topic partition.
 
-[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
+[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"]
 ===== `sasl_client_callback_handler_class`
-* Value type is <<string,string>>
-* There is no default value for this setting.
+  * Value type is <<string,string>>
+  * There is no default value for this setting.
 
 The SASL client callback handler class the specified SASL mechanism should use.
 
+[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"]
+===== `sasl_oauthbearer_token_endpoint_url`
+  * Value type is <<string,string>>
+  * There is no default value for this setting.
+
+The URL for the OAuth 2.0 issuer token endpoint.
+
+[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"]
+===== `sasl_oauthbearer_scope_claim_name`
+  * Value type is <<string,string>>
+  * Default value is `"scope"`
+
+(optional) The override name of the scope claim.
+
+[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"]
+===== `sasl_login_callback_handler_class`
+  * Value type is <<string,string>>
+  * There is no default value for this setting.
+
+The SASL login callback handler class the specified SASL mechanism should use.
+
+[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"]
+===== `sasl_login_connect_timeout_ms`
+  * Value type is <<number,number>>
+  * Default value is `10000` milliseconds.
+
+(optional) The duration, in milliseconds, for HTTPS connect timeout
+
+[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"]
+===== `sasl_login_read_timeout_ms`
+  * Value type is <<number,number>>
+  * Default value is `10000` milliseconds.
+
+(optional) The duration, in milliseconds, for HTTPS read timeout.
+
+[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"]
+===== `sasl_login_retry_backoff_ms`
+  * Value type is <<number,number>>
+  * Default value is `100` milliseconds.
+
+(optional) The duration, in milliseconds, to wait between HTTPS call attempts.
+
+[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"]
+===== `sasl_login_retry_backoff_max_ms`
+  * Value type is <<number,number>>
+  * Default value is `10000` milliseconds.
+
+(optional) The maximum duration, in milliseconds, for HTTPS call attempts.
+
 [id="plugins-{type}s-{plugin}-sasl_jaas_config"]
 ===== `sasl_jaas_config` 
 
diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb
index 604bc68..4dca75f 100644
--- a/lib/logstash/inputs/kafka.rb
+++ b/lib/logstash/inputs/kafka.rb
@@ -210,6 +210,20 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
   config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
   # SASL client callback handler class
   config :sasl_client_callback_handler_class, :validate => :string
+  # The URL for the OAuth 2.0 issuer token endpoint.
+  config :sasl_oauthbearer_token_endpoint_url, :validate => :string
+  # (optional) The override name of the scope claim.
+  config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope'
+  # SASL login callback handler class
+  config :sasl_login_callback_handler_class, :validate => :string
+  # (optional) The duration, in milliseconds, for HTTPS connect timeout
+  config :sasl_login_connect_timeout_ms, :validate => :number, :default => 10000
+  # (optional) The duration, in milliseconds, for HTTPS read timeout.
+  config :sasl_login_read_timeout_ms, :validate => :number, :default => 10000
+  # (optional) The duration, in milliseconds, to wait between HTTPS call attempts.
+  config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100
+  # (optional) The maximum duration, in milliseconds, for HTTPS call attempts.
+  config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000
   # http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. 
   # This may be any mechanism for which a security provider is available.
   # GSSAPI is the default mechanism.
diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb
index ebf233f..b2e3ba5 100644
--- a/lib/logstash/outputs/kafka.rb
+++ b/lib/logstash/outputs/kafka.rb
@@ -149,6 +149,20 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
   config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
   # SASL client callback handler class
   config :sasl_client_callback_handler_class, :validate => :string
+  # The URL for the OAuth 2.0 issuer token endpoint.
+  config :sasl_oauthbearer_token_endpoint_url, :validate => :string
+  # (optional) The override name of the scope claim.
+  config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope'
+  # SASL login callback handler class
+  config :sasl_login_callback_handler_class, :validate => :string
+  # (optional) The duration, in milliseconds, for HTTPS connect timeout
+  config :sasl_login_connect_timeout_ms, :validate => :number, :default => 10000
+  # (optional) The duration, in milliseconds, for HTTPS read timeout.
+  config :sasl_login_read_timeout_ms, :validate => :number, :default => 10000
+  # (optional) The duration, in milliseconds, to wait between HTTPS call attempts.
+  config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100
+  # (optional) The maximum duration, in milliseconds, for HTTPS call attempts.
+  config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000
   # http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. 
   # This may be any mechanism for which a security provider is available.
   # GSSAPI is the default mechanism.
diff --git a/lib/logstash/plugin_mixins/kafka/common.rb b/lib/logstash/plugin_mixins/kafka/common.rb
index 1ae8546..6564e80 100644
--- a/lib/logstash/plugin_mixins/kafka/common.rb
+++ b/lib/logstash/plugin_mixins/kafka/common.rb
@@ -42,6 +42,13 @@ def set_sasl_config(props)
       props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
       props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
       props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil?
+      props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil?
+      props.put("sasl.oauthbearer.scope.claim.name", sasl_oauthbearer_scope_claim_name) unless sasl_oauthbearer_scope_claim_name.nil?
+      props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil?
+      props.put("sasl.login.connect.timeout.ms", sasl_login_connect_timeout_ms.to_s) unless sasl_login_connect_timeout_ms.nil?
+      props.put("sasl.login.read.timeout.ms", sasl_login_read_timeout_ms.to_s) unless sasl_login_read_timeout_ms.nil?
+      props.put("sasl.login.retry.backoff.ms", sasl_login_retry_backoff_ms.to_s) unless sasl_login_retry_backoff_ms.nil?
+      props.put("sasl.login.retry.backoff.max.ms", sasl_login_retry_backoff_max_ms.to_s) unless sasl_login_retry_backoff_max_ms.nil?
     end
 
     def reassign_dns_lookup