diff --git a/tutorials/oauth/java/managedidentity/README.md b/tutorials/oauth/java/managedidentity/README.md
index 6b48fcf..f87d72b 100644
--- a/tutorials/oauth/java/managedidentity/README.md
+++ b/tutorials/oauth/java/managedidentity/README.md
@@ -8,7 +8,7 @@ If you don't have an Azure subscription, create a [free account](https://azure.m
In addition:
-* [Java Development Kit (JDK) 1.7+](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
+* [Java Development Kit (JDK) 17+](http://www.oracle.com/technetwork/java/javase/downloads/index.html)
* On Ubuntu, run `apt-get install default-jdk` to install the JDK.
* Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
* [Download](http://maven.apache.org/download.cgi) and [install](http://maven.apache.org/install.html) a Maven binary archive
@@ -72,7 +72,7 @@ Kafka clients need to be configured in a way that they can authenticate with Azu
`sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;`
* Set login callback handler. This is the authentication handler which is responsible to complete oauth flow and return an access token.
- `sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler;`
+ `sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler;`
## Producer
@@ -89,18 +89,18 @@ bootstrap.servers=mynamespace.servicebus.windows.net:9093 # REPLACE
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
-sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler;
+sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler;
```
### Run producer from command line
-This sample is configured to send messages to topic `test`, if you would like to change the topic, change the TOPIC constant in `producer/src/main/java/TestProducer.java`.
+This sample is configured to send messages to topic `test`, if you would like to change the topic, change the TOPIC constant in `producer/src/main/java/de/microsoft/examples/TestProducer.java`.
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
-mvn exec:java -Dexec.mainClass="TestProducer"
+mvn exec:java -Dexec.mainClass="de.microsoft.examples.TestProducer"
```
The producer will now begin sending events to the Kafka-enabled Event Hub at topic `test` (or whatever topic you chose) and printing the events to stdout.
@@ -122,18 +122,18 @@ request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
-sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler;
+sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler;
```
### Run consumer from command line
-This sample is configured to receive messages from topic `test`, if you would like to change the topic, change the TOPIC constant in `consumer/src/main/java/TestConsumer.java`.
+This sample is configured to receive messages from topic `test`, if you would like to change the topic, change the TOPIC constant in `consumer/src/main/java/de/microsoft/examples/TestConsumer.java`.
To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
```bash
mvn clean package
-mvn exec:java -Dexec.mainClass="TestConsumer"
+mvn exec:java -Dexec.mainClass="de.microsoft.examples.TestConsumer"
```
If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `test` (or whatever topic you chose).
diff --git a/tutorials/oauth/java/managedidentity/consumer/.classpath b/tutorials/oauth/java/managedidentity/consumer/.classpath
deleted file mode 100644
index 698778f..0000000
--- a/tutorials/oauth/java/managedidentity/consumer/.classpath
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/tutorials/oauth/java/managedidentity/consumer/.project b/tutorials/oauth/java/managedidentity/consumer/.project
deleted file mode 100644
index 00d7e80..0000000
--- a/tutorials/oauth/java/managedidentity/consumer/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-
-
- event-hubs-kafka-java-consumer
-
-
-
-
-
- org.eclipse.jdt.core.javabuilder
-
-
-
-
- org.eclipse.m2e.core.maven2Builder
-
-
-
-
-
- org.eclipse.jdt.core.javanature
- org.eclipse.m2e.core.maven2Nature
-
-
diff --git a/tutorials/oauth/java/managedidentity/consumer/pom.xml b/tutorials/oauth/java/managedidentity/consumer/pom.xml
index b575cd4..909ea9e 100644
--- a/tutorials/oauth/java/managedidentity/consumer/pom.xml
+++ b/tutorials/oauth/java/managedidentity/consumer/pom.xml
@@ -1,6 +1,7 @@
-
+
4.0.0
@@ -12,18 +13,31 @@
UTF-8
UTF-8
+ 17
+ ${java.version}
+ ${java.version}
-
- org.apache.kafka
- kafka_2.12
- 2.3.1
-
- com.microsoft.azure
- azure-client-authentication
- 1.6.15
- compile
+ org.apache.kafka
+ kafka_2.13
+ 3.3.2
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.6.4
+
+
+ com.azure
+ azure-identity
+ 1.7.3
@@ -32,19 +46,12 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.6.1
-
- 1.7
- 1.7
-
+ 3.10.1
org.apache.maven.plugins
maven-resources-plugin
- 3.0.2
-
- UTF-8
-
+ 3.3.0
diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/CustomAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/CustomAuthenticateCallbackHandler.java
deleted file mode 100644
index 668d002..0000000
--- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/CustomAuthenticateCallbackHandler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-//Copyright (c) Microsoft Corporation. All rights reserved.
-//Licensed under the MIT License.
-import java.io.IOException;
-import java.net.URI;
-import java.text.ParseException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
-
-import com.microsoft.azure.credentials.MSICredentials;
-import com.nimbusds.jwt.JWT;
-import com.nimbusds.jwt.JWTClaimsSet;
-import com.nimbusds.jwt.JWTParser;
-
-public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
-
- final static ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
- final static MSICredentials CREDENTIALS = new MSICredentials();
- // Use AppServiceMSICredentials instead for App Service deployment.
- // final static AppServiceMSICredentials CREDENTIALS = new AppServiceMSICredentials(AzureEnvironment.AZURE);
-
- private String sbUri;
-
- @Override
- public void configure(Map configs, String mechanism, List jaasConfigEntries) {
- String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();
- bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
- URI uri = URI.create("https://" + bootstrapServer);
- this.sbUri = uri.getScheme() + "://" + uri.getHost();
- }
-
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (Callback callback: callbacks) {
- if (callback instanceof OAuthBearerTokenCallback) {
- try {
- OAuthBearerToken token = getOAuthBearerToken();
- OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
- oauthCallback.token(token);
- } catch (InterruptedException | ExecutionException | TimeoutException | ParseException e) {
- e.printStackTrace();
- }
- } else {
- throw new UnsupportedCallbackException(callback);
- }
- }
- }
-
- OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException, TimeoutException, IOException, ParseException
- {
- String accesToken = CREDENTIALS.getToken(sbUri);
- JWT jwt = JWTParser.parse(accesToken);
- JWTClaimsSet claims = jwt.getJWTClaimsSet();
-
- return new OAuthBearerTokenImp(accesToken, claims.getExpirationTime());
- }
-
- public void close() throws KafkaException {
- // NOOP
- }
-}
diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumerThread.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumerThread.java
deleted file mode 100644
index 5ee68f0..0000000
--- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumerThread.java
+++ /dev/null
@@ -1,72 +0,0 @@
-//Copyright (c) Microsoft Corporation. All rights reserved.
-//Licensed under the MIT License.
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import java.io.FileReader;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-
-public class TestConsumerThread implements Runnable {
-
- private final String TOPIC;
-
- //Each consumer needs a unique client ID per thread
- private static int id = 0;
-
- public TestConsumerThread(final String TOPIC){
- this.TOPIC = TOPIC;
- }
-
- public void run (){
- final Consumer consumer = createConsumer();
- System.out.println("Polling");
-
- try {
- while (true) {
- final ConsumerRecords consumerRecords = consumer.poll(1000);
- for(ConsumerRecord cr : consumerRecords) {
- System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", cr.key(), cr.value(), cr.partition(), cr.offset());
- }
- consumer.commitAsync();
- }
- } catch (CommitFailedException e) {
- System.out.println("CommitFailedException: " + e);
- } finally {
- consumer.close();
- }
- }
-
- private Consumer createConsumer() {
- try {
- final Properties properties = new Properties();
- synchronized (TestConsumerThread.class) {
- properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaExampleConsumer#" + id);
- id++;
- }
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- //Get remaining properties from config file
- properties.load(new FileReader("src/main/resources/consumer.config"));
-
- // Create the consumer using properties.
- final Consumer consumer = new KafkaConsumer<>(properties);
-
- // Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(TOPIC));
- return consumer;
-
- } catch (FileNotFoundException e){
- System.out.println("FileNoteFoundException: " + e);
- System.exit(1);
- return null; //unreachable
- } catch (IOException e){
- System.out.println("IOException: " + e);
- System.exit(1);
- return null; //unreachable
- }
- }
-}
diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java
new file mode 100644
index 0000000..8597c0f
--- /dev/null
+++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java
@@ -0,0 +1,103 @@
+//Copyright (c) Microsoft Corporation. All rights reserved.
+//Licensed under the MIT License.
+package de.microsoft.examples;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.nimbusds.jwt.JWT;
+import com.nimbusds.jwt.JWTParser;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+
+public class AzureAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
+
+ private String requestedScope;
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback: callbacks) {
+ if (!(callback instanceof OAuthBearerTokenCallback)) {
+ throw new UnsupportedCallbackException(callback);
+ }
+
+ try {
+ OAuthBearerToken token = getOAuthBearerToken();
+ OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
+ oauthCallback.token(token);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException | ParseException e) {
+ throw new RuntimeException("Failed to get token from azure.", e);
+ }
+ }
+ }
+
+ private OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException,
+ TimeoutException, ParseException {
+ final TokenCredential defaultCredential = new DefaultAzureCredentialBuilder()
+ .build();
+
+ final TokenRequestContext tokenRequestContext = new TokenRequestContext()
+ .addScopes(requestedScope);
+
+ final AccessToken accessToken = defaultCredential
+ .getTokenSync(tokenRequestContext);
+ return mapToOAuthBearerToken(accessToken);
+ }
+
+ private static OAuthBearerTokenImpl mapToOAuthBearerToken(AccessToken value) {
+ try{
+ final JWT jwt = JWTParser.parse(value.getToken());
+ return new OAuthBearerTokenImpl(value.getToken(), jwt.getJWTClaimsSet().getExpirationTime());
+ } catch(ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws KafkaException {
+ // NOOP
+ }
+
+ @Override
+ public void configure(Map configs, String saslMechanism, List jaasConfigEntries) {
+ String bootstrapServer = extractFirstBootstrapServer(configs);
+
+ final String hostname = bootstrapServer.split(":")[0];
+ this.requestedScope = "https://" + hostname + "/.default";
+ }
+
+ private static String extractFirstBootstrapServer(Map configs) {
+ if (!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ throw new IllegalStateException("Missing bootstrap.servers in kafka configuration.");
+ }
+
+ if (!(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) instanceof List> bootstrapServersList)) {
+ throw new IllegalStateException(
+ "bootstrap.servers in kafka configuration is not a String value");
+ }
+
+ if (bootstrapServersList.size() > 1) {
+ throw new IllegalStateException("More than 1 bootstrap.servers not supported in this example.");
+ }
+
+ if (!(bootstrapServersList.get(0) instanceof String bootstrapServer)) {
+ throw new IllegalStateException("bootstrap.servers has to be a String.");
+ }
+ return bootstrapServer;
+ }
+
+}
diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/OAuthBearerTokenImp.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java
similarity index 60%
rename from tutorials/oauth/java/managedidentity/producer/src/main/java/OAuthBearerTokenImp.java
rename to tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java
index 9748b21..8142a7d 100644
--- a/tutorials/oauth/java/managedidentity/producer/src/main/java/OAuthBearerTokenImp.java
+++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java
@@ -1,33 +1,36 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
+package de.microsoft.examples;
+
import java.util.Date;
+import java.util.HashSet;
import java.util.Set;
-
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
-public class OAuthBearerTokenImp implements OAuthBearerToken
-{
- String token;
- long lifetimeMs;
-
- public OAuthBearerTokenImp(final String token, Date expiresOn) {
+public class OAuthBearerTokenImpl implements OAuthBearerToken {
+
+ private final String token;
+ private final long lifetimeMs;
+ private final Set scopes = new HashSet<>();
+
+ public OAuthBearerTokenImpl(final String token, final Date expiresOn) {
this.token = token;
this.lifetimeMs = expiresOn.getTime();
}
-
+
@Override
public String value() {
- return this.token;
+ return token;
}
@Override
public Set scope() {
- return null;
+ return scopes;
}
@Override
public long lifetimeMs() {
- return this.lifetimeMs;
+ return lifetimeMs;
}
@Override
@@ -39,4 +42,4 @@ public String principalName() {
public Long startTimeMs() {
return null;
}
-}
\ No newline at end of file
+}
diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumer.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumer.java
similarity index 79%
rename from tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumer.java
rename to tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumer.java
index 5d1d5ca..0c8c0f7 100644
--- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumer.java
+++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumer.java
@@ -1,4 +1,4 @@
-//Copyright (c) Microsoft Corporation. All rights reserved.
+package de.microsoft.examples;//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -6,10 +6,10 @@
public class TestConsumer {
//Change constant to send messages to the desired topic
private final static String TOPIC = "test";
-
+
private final static int NUM_THREADS = 1;
- public static void main(String... args) throws Exception {
+ public static void main(String... args) {
final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumerThread.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumerThread.java
new file mode 100644
index 0000000..9a7b9a3
--- /dev/null
+++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumerThread.java
@@ -0,0 +1,62 @@
+//Copyright (c) Microsoft Corporation. All rights reserved.
+//Licensed under the MIT License.
+package de.microsoft.examples;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+public class TestConsumerThread implements Runnable {
+
+ private final String topic;
+
+ //Each consumer needs a unique client ID per thread
+ private static int id = 0;
+
+ public TestConsumerThread(final String topic){
+ this.topic = topic;
+ }
+
+ public void run (){
+ try (Consumer consumer = createConsumer()) {
+ System.out.println("Polling");
+
+ while (true) {
+ final ConsumerRecords consumerRecords = consumer.poll(1000);
+ for (ConsumerRecord cr : consumerRecords) {
+ System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", cr.key(), cr.value(),
+ cr.partition(), cr.offset());
+ }
+ consumer.commitAsync();
+ }
+ } catch (CommitFailedException e) {
+ System.out.println("CommitFailedException: " + e);
+ } catch (IOException e) {
+ System.out.println("IOException: " + e);
+ System.exit(1);
+ }
+ }
+
+ private Consumer createConsumer() throws IOException {
+ final Properties properties = new Properties();
+ synchronized (TestConsumerThread.class) {
+ properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaExampleConsumer#" + id);
+ id++;
+ }
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ //Get remaining properties from config file
+ properties.load(TestConsumerThread.class.getResourceAsStream("/consumer.config"));
+
+ // Create the consumer using properties.
+ final Consumer consumer = new KafkaConsumer<>(properties);
+
+ // Subscribe to the topic.
+ consumer.subscribe(Collections.singletonList(topic));
+ return consumer;
+ }
+}
diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config b/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config
index 5cc0ab3..a7d0d54 100644
--- a/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config
+++ b/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config
@@ -4,4 +4,4 @@ request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
-sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler
+sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler
diff --git a/tutorials/oauth/java/managedidentity/producer/.classpath b/tutorials/oauth/java/managedidentity/producer/.classpath
deleted file mode 100644
index 698778f..0000000
--- a/tutorials/oauth/java/managedidentity/producer/.classpath
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/tutorials/oauth/java/managedidentity/producer/.project b/tutorials/oauth/java/managedidentity/producer/.project
deleted file mode 100644
index dae3335..0000000
--- a/tutorials/oauth/java/managedidentity/producer/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-
-
- event-hubs-kafka-java-producer
-
-
-
-
-
- org.eclipse.jdt.core.javabuilder
-
-
-
-
- org.eclipse.m2e.core.maven2Builder
-
-
-
-
-
- org.eclipse.jdt.core.javanature
- org.eclipse.m2e.core.maven2Nature
-
-
diff --git a/tutorials/oauth/java/managedidentity/producer/pom.xml b/tutorials/oauth/java/managedidentity/producer/pom.xml
index 96d1617..943cc8f 100644
--- a/tutorials/oauth/java/managedidentity/producer/pom.xml
+++ b/tutorials/oauth/java/managedidentity/producer/pom.xml
@@ -1,6 +1,7 @@
-
+
4.0.0
@@ -12,18 +13,31 @@
UTF-8
UTF-8
+ 17
+ ${java.version}
+ ${java.version}
-
- org.apache.kafka
- kafka_2.12
- 2.3.1
-
- com.microsoft.azure
- azure-client-authentication
- 1.6.15
- compile
+ org.apache.kafka
+ kafka_2.13
+ 3.3.2
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.6.4
+
+
+ com.azure
+ azure-identity
+ 1.7.3
@@ -32,19 +46,12 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.6.1
-
- 1.7
- 1.7
-
+ 3.10.1
org.apache.maven.plugins
maven-resources-plugin
- 3.0.2
-
- UTF-8
-
+ 3.3.0
diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/CustomAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/CustomAuthenticateCallbackHandler.java
deleted file mode 100644
index 668d002..0000000
--- a/tutorials/oauth/java/managedidentity/producer/src/main/java/CustomAuthenticateCallbackHandler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-//Copyright (c) Microsoft Corporation. All rights reserved.
-//Licensed under the MIT License.
-import java.io.IOException;
-import java.net.URI;
-import java.text.ParseException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
-
-import com.microsoft.azure.credentials.MSICredentials;
-import com.nimbusds.jwt.JWT;
-import com.nimbusds.jwt.JWTClaimsSet;
-import com.nimbusds.jwt.JWTParser;
-
-public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
-
- final static ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
- final static MSICredentials CREDENTIALS = new MSICredentials();
- // Use AppServiceMSICredentials instead for App Service deployment.
- // final static AppServiceMSICredentials CREDENTIALS = new AppServiceMSICredentials(AzureEnvironment.AZURE);
-
- private String sbUri;
-
- @Override
- public void configure(Map configs, String mechanism, List jaasConfigEntries) {
- String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();
- bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
- URI uri = URI.create("https://" + bootstrapServer);
- this.sbUri = uri.getScheme() + "://" + uri.getHost();
- }
-
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (Callback callback: callbacks) {
- if (callback instanceof OAuthBearerTokenCallback) {
- try {
- OAuthBearerToken token = getOAuthBearerToken();
- OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
- oauthCallback.token(token);
- } catch (InterruptedException | ExecutionException | TimeoutException | ParseException e) {
- e.printStackTrace();
- }
- } else {
- throw new UnsupportedCallbackException(callback);
- }
- }
- }
-
- OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException, TimeoutException, IOException, ParseException
- {
- String accesToken = CREDENTIALS.getToken(sbUri);
- JWT jwt = JWTParser.parse(accesToken);
- JWTClaimsSet claims = jwt.getJWTClaimsSet();
-
- return new OAuthBearerTokenImp(accesToken, claims.getExpirationTime());
- }
-
- public void close() throws KafkaException {
- // NOOP
- }
-}
diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java
new file mode 100644
index 0000000..8597c0f
--- /dev/null
+++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java
@@ -0,0 +1,103 @@
+//Copyright (c) Microsoft Corporation. All rights reserved.
+//Licensed under the MIT License.
+package de.microsoft.examples;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.nimbusds.jwt.JWT;
+import com.nimbusds.jwt.JWTParser;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+
+public class AzureAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
+
+ private String requestedScope;
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback: callbacks) {
+ if (!(callback instanceof OAuthBearerTokenCallback)) {
+ throw new UnsupportedCallbackException(callback);
+ }
+
+ try {
+ OAuthBearerToken token = getOAuthBearerToken();
+ OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
+ oauthCallback.token(token);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException | ParseException e) {
+ throw new RuntimeException("Failed to get token from azure.", e);
+ }
+ }
+ }
+
+ private OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException,
+ TimeoutException, ParseException {
+ final TokenCredential defaultCredential = new DefaultAzureCredentialBuilder()
+ .build();
+
+ final TokenRequestContext tokenRequestContext = new TokenRequestContext()
+ .addScopes(requestedScope);
+
+ final AccessToken accessToken = defaultCredential
+ .getTokenSync(tokenRequestContext);
+ return mapToOAuthBearerToken(accessToken);
+ }
+
+ private static OAuthBearerTokenImpl mapToOAuthBearerToken(AccessToken value) {
+ try{
+ final JWT jwt = JWTParser.parse(value.getToken());
+ return new OAuthBearerTokenImpl(value.getToken(), jwt.getJWTClaimsSet().getExpirationTime());
+ } catch(ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws KafkaException {
+ // NOOP
+ }
+
+ @Override
+ public void configure(Map configs, String saslMechanism, List jaasConfigEntries) {
+ String bootstrapServer = extractFirstBootstrapServer(configs);
+
+ final String hostname = bootstrapServer.split(":")[0];
+ this.requestedScope = "https://" + hostname + "/.default";
+ }
+
+ private static String extractFirstBootstrapServer(Map configs) {
+ if (!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ throw new IllegalStateException("Missing bootstrap.servers in kafka configuration.");
+ }
+
+ if (!(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) instanceof List> bootstrapServersList)) {
+ throw new IllegalStateException(
+ "bootstrap.servers in kafka configuration is not a String value");
+ }
+
+ if (bootstrapServersList.size() > 1) {
+ throw new IllegalStateException("More than 1 bootstrap.servers not supported in this example.");
+ }
+
+ if (!(bootstrapServersList.get(0) instanceof String bootstrapServer)) {
+ throw new IllegalStateException("bootstrap.servers has to be a String.");
+ }
+ return bootstrapServer;
+ }
+
+}
diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/OAuthBearerTokenImp.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java
similarity index 60%
rename from tutorials/oauth/java/managedidentity/consumer/src/main/java/OAuthBearerTokenImp.java
rename to tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java
index 9748b21..29317b2 100644
--- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/OAuthBearerTokenImp.java
+++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java
@@ -1,33 +1,37 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
+package de.microsoft.examples;
+
import java.util.Date;
+import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
-public class OAuthBearerTokenImp implements OAuthBearerToken
-{
- String token;
- long lifetimeMs;
-
- public OAuthBearerTokenImp(final String token, Date expiresOn) {
+public class OAuthBearerTokenImpl implements OAuthBearerToken {
+
+ private final String token;
+ private final long lifetimeMs;
+ private final Set scopes = new HashSet<>();
+
+ public OAuthBearerTokenImpl(final String token, final Date expiresOn) {
this.token = token;
this.lifetimeMs = expiresOn.getTime();
}
-
+
@Override
public String value() {
- return this.token;
+ return token;
}
@Override
public Set scope() {
- return null;
+ return scopes;
}
@Override
public long lifetimeMs() {
- return this.lifetimeMs;
+ return lifetimeMs;
}
@Override
@@ -39,4 +43,4 @@ public String principalName() {
public Long startTimeMs() {
return null;
}
-}
\ No newline at end of file
+}
diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestDataReporter.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestDataReporter.java
similarity index 54%
rename from tutorials/oauth/java/managedidentity/producer/src/main/java/TestDataReporter.java
rename to tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestDataReporter.java
index 7e69439..3924751 100644
--- a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestDataReporter.java
+++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestDataReporter.java
@@ -1,39 +1,36 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
-import org.apache.kafka.clients.producer.Callback;
+package de.microsoft.examples;
+
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import java.sql.Timestamp;
public class TestDataReporter implements Runnable {
private static final int NUM_MESSAGES = 100;
- private final String TOPIC;
- private Producer producer;
+ private final String topic;
+ private final Producer producer;
- public TestDataReporter(final Producer producer, String TOPIC) {
+ public TestDataReporter(final Producer producer, String topic) {
this.producer = producer;
- this.TOPIC = TOPIC;
+ this.topic = topic;
}
@Override
public void run() {
- for(int i = 0; i < NUM_MESSAGES; i++) {
+ for(int i = 0; i < NUM_MESSAGES; i++) {
long time = System.currentTimeMillis();
System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId());
-
- final ProducerRecord record = new ProducerRecord(TOPIC, time, "Test Data #" + i);
- producer.send(record, new Callback() {
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- System.out.println(exception);
- System.exit(1);
- }
+
+ final ProducerRecord record = new ProducerRecord(topic, time, "Test Data #" + i);
+ producer.send(record, (metadata, exception) -> {
+ if (exception != null) {
+ System.out.println(exception);
+ System.exit(1);
}
});
}
System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread().getId() + "!");
}
-}
\ No newline at end of file
+}
diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestProducer.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestProducer.java
similarity index 55%
rename from tutorials/oauth/java/managedidentity/producer/src/main/java/TestProducer.java
rename to tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestProducer.java
index ca2cec2..7494546 100644
--- a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestProducer.java
+++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestProducer.java
@@ -1,12 +1,14 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.
+package de.microsoft.examples;
+
+import java.io.IOException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
-import java.io.FileReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -14,11 +16,11 @@
public class TestProducer {
//Change constant to send messages to the desired topic, for this example we use 'test'
private final static String TOPIC = "test";
-
+
private final static int NUM_THREADS = 1;
- public static void main(String... args) throws Exception {
+ public static void main(String... args) throws IOException {
//Create Kafka Producer
final Producer producer = createProducer();
@@ -29,19 +31,13 @@ public static void main(String... args) throws Exception {
executorService.execute(new TestDataReporter(producer, TOPIC));
}
- private static Producer createProducer() {
- try{
- Properties properties = new Properties();
- properties.load(new FileReader("src/main/resources/producer.config"));
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- return new KafkaProducer<>(properties);
- } catch (Exception e){
- System.out.println("Failed to create producer with exception: " + e);
- System.exit(0);
- return null; //unreachable
- }
+ private static Producer createProducer() throws IOException {
+ Properties properties = new Properties();
+ properties.load(TestProducer.class.getResourceAsStream("/producer.config"));
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ return new KafkaProducer<>(properties);
}
}
diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config b/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config
index 3ea6785..bb8cfff 100644
--- a/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config
+++ b/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config
@@ -2,4 +2,4 @@ bootstrap.servers=mynamespace.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
-sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler
+sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler