@@ -39,7 +39,11 @@ class KafkaClient {
3939 // If SSL keys are provided, disable SASL
4040 if ( this . config . ssl && ( this . config . ssl . ca || this . config . ssl . cert || this . config . ssl . key ) ) {
4141 this . config . useSasl = false ;
42- }
42+ }
43+
44+ if ( this . config . vendor === 'aiven' && this . config . ssl . ca && ! this . config . ssl . cert && ! this . config . ssl . key ) {
45+ this . config . useSasl = true ;
46+ }
4347
4448 // Handle authentication based on vendor and configuration
4549 if ( this . config . useSasl && this . config . sasl ) {
@@ -180,52 +184,25 @@ class KafkaClient {
180184 return true ;
181185
182186 case 'aiven' :
183- // Aiven uses SASL_SSL with SCRAM-SHA-256 or OAuth
184- console . log ( '🔐 Aiven detected - configuring SSL with certificates...' ) ;
185- if ( this . config . ssl && ( this . config . ssl . ca || this . config . ssl . cert || this . config . ssl . key ) ) {
186- kafkaConfig . ssl = await this . buildSslConfig ( ) ;
187- }
187+ // Aiven supports both SSL with certificates and SASL_SSL with username/password
188+ const hasSaslCredentials = this . config . sasl && this . config . sasl . username && this . config . sasl . password ;
189+ const hasSslCertificates = this . config . ssl && ( this . config . ssl . cert || this . config . ssl . key ) ;
188190
189- if ( mechanism === 'oauthbearer' && useOIDC ) {
190- const oidcProvider = await createOIDCProvider ( 'oidc' , {
191- ...this . config . sasl ,
192- discoveryUrl : this . config . sasl . discoveryUrl ,
193- clientId : this . config . sasl . clientId ,
194- clientSecret : this . config . sasl . clientSecret ,
195- tokenHost : this . config . sasl . host || this . config . sasl . tokenHost ,
196- tokenPath : this . config . sasl . path || this . config . sasl . tokenPath ,
197- scope : this . config . sasl . scope ,
198- audience : this . config . sasl . audience ,
199- validateToken : this . config . sasl . validateToken
200- } ) ;
201-
202- kafkaConfig . sasl = {
203- mechanism : 'oauthbearer' ,
204- oauthBearerProvider : async ( ) => {
205- return await oidcProvider . getToken ( ) ;
206- }
207- } ;
208- } else if ( mechanism === 'oauthbearer' ) {
209- // Legacy OAuth support
210- const oauthProvider = createOAuthProvider ( 'generic' , {
211- clientId : this . config . sasl . clientId ,
212- clientSecret : this . config . sasl . clientSecret ,
213- tokenHost : this . config . sasl . host || this . config . sasl . tokenHost ,
214- tokenPath : this . config . sasl . path || this . config . sasl . tokenPath
215- } ) ;
216-
217- kafkaConfig . sasl = {
218- mechanism : 'oauthbearer' ,
219- oauthBearerProvider : async ( ) => {
220- return await oauthProvider . getToken ( ) ;
221- }
222- } ;
223- } else {
191+ if ( hasSaslCredentials ) {
192+ // SASL_SSL mode with username/password
193+ console . log ( '🔐 Aiven detected - configuring SASL_SSL with username/password...' ) ;
194+ kafkaConfig . ssl = await this . buildSslConfig ( ) ;
224195 kafkaConfig . sasl = {
225196 mechanism : 'scram-sha-256' , // Aiven typically uses SCRAM-SHA-256
226197 username : this . config . sasl . username ,
227198 password : this . config . sasl . password
228199 } ;
200+ } else if ( hasSslCertificates ) {
201+ // SSL mode with client certificates
202+ console . log ( '🔐 Aiven detected - configuring SSL with client certificates...' ) ;
203+ kafkaConfig . ssl = await this . buildSslConfig ( ) ;
204+ } else {
205+ throw new Error ( 'Aiven configuration requires either SASL credentials (username/password) or SSL certificates (cert/key)' ) ;
229206 }
230207 break ;
231208
0 commit comments