diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index 40b7412ef..329be0262 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -91,6 +91,7 @@ ssl.certificate.verify_cb | * | |
sasl.mechanisms | * | | GSSAPI | high | SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. **NOTE**: Despite the name only one mechanism must be configured.
*Type: string*
sasl.mechanism | * | | GSSAPI | high | Alias for `sasl.mechanisms`: SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. **NOTE**: Despite the name only one mechanism must be configured.
*Type: string*
sasl.kerberos.service.name | * | | kafka | low | Kerberos principal name that Kafka runs as, not including /hostname@REALM
*Type: string*
+sasl.kerberos.domain.name | * | | | low | Override for the broker hostname part in the service principal (the portion after `service/`). Use this if the Kerberos ticket should target a fixed FQDN instead of the broker's advertised hostname.
*Type: string*
sasl.kerberos.principal | * | | kafkaclient | low | This client's Kerberos principal name. (Not supported on Windows, will use the logon user's principal).
*Type: string*
sasl.kerberos.kinit.cmd | * | | kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} \|\| kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} | low | Shell command to refresh or acquire the client's Kerberos ticket. This command is executed on client creation and every sasl.kerberos.min.time.before.relogin (0=disable). %{config.prop.name} is replaced by corresponding config object value.
*Type: string*
sasl.kerberos.keytab | * | | | low | Path to Kerberos keytab file. This configuration property is only used as a variable in `sasl.kerberos.kinit.cmd` as ` ... -t "%{sasl.kerberos.keytab}"`.
*Type: string*
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index 1f8bbf106..0dea1ada7 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -996,6 +996,12 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"Kerberos principal name that Kafka runs as, "
"not including /hostname@REALM",
.sdef = "kafka"},
+ {_RK_GLOBAL, "sasl.kerberos.domain.name", _RK_C_STR,
+ _RK(sasl.domain_name),
+ "Override for the broker hostname portion of the service principal "
+ "(the string after \"service/\"). Set this if the Kerberos ticket "
+ "should always target a fixed FQDN regardless of the broker's "
+ "advertised hostname."},
{_RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR, _RK(sasl.principal),
"This client's Kerberos principal name. "
"(Not supported on Windows, will use the logon user's principal).",
diff --git a/src/rdkafka_sasl_cyrus.c b/src/rdkafka_sasl_cyrus.c
index 89ff15c42..597956461 100644
--- a/src/rdkafka_sasl_cyrus.c
+++ b/src/rdkafka_sasl_cyrus.c
@@ -522,6 +522,7 @@ static int rd_kafka_sasl_cyrus_client_new(rd_kafka_transport_t *rktrans,
rktrans},
{SASL_CB_CANON_USER, (void *)rd_kafka_sasl_cyrus_cb_canon, rktrans},
{SASL_CB_LIST_END}};
+ const char *principal_host;
state = rd_calloc(1, sizeof(*state));
rktrans->rktrans_sasl.state = state;
@@ -544,8 +545,13 @@ static int rd_kafka_sasl_cyrus_client_new(rd_kafka_transport_t *rktrans,
memcpy(state->callbacks, callbacks, sizeof(callbacks));
+ principal_host = (rk->rk_conf.sasl.domain_name &&
+ *rk->rk_conf.sasl.domain_name)
+ ? rk->rk_conf.sasl.domain_name
+ : hostname;
+
mtx_lock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
- r = sasl_client_new(rk->rk_conf.sasl.service_name, hostname, NULL,
+ r = sasl_client_new(rk->rk_conf.sasl.service_name, principal_host, NULL,
NULL, /* no local & remote IP checks */
state->callbacks, 0, &state->conn);
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
diff --git a/src/rdkafka_sasl_win32.c b/src/rdkafka_sasl_win32.c
index b968bcece..5a623bf88 100644
--- a/src/rdkafka_sasl_win32.c
+++ b/src/rdkafka_sasl_win32.c
@@ -512,9 +512,18 @@ static int rd_kafka_sasl_win32_client_new(rd_kafka_transport_t *rktrans,
state = rd_calloc(1, sizeof(*state));
rktrans->rktrans_sasl.state = state;
- _snwprintf(state->principal, RD_ARRAYSIZE(state->principal), L"%hs/%hs",
- rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.service_name,
- hostname);
+ {
+ const char *principal_host;
+ if (rk->rk_conf.sasl.domain_name &&
+ *rk->rk_conf.sasl.domain_name)
+ principal_host = rk->rk_conf.sasl.domain_name;
+ else
+ principal_host = hostname;
+
+ _snwprintf(state->principal, RD_ARRAYSIZE(state->principal),
+ L"%hs/%hs", rk->rk_conf.sasl.service_name,
+ principal_host);
+ }
state->cred = rd_kafka_sasl_sspi_cred_new(rktrans, errstr, errstr_size);
if (!state->cred)