Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,42 +1,66 @@
package io.conduktor.demos.kafka.wikimedia;

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.EventSource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
package io.conducktor.kafka.wiki;

import java.net.URI;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class WikimediaChangesProducer {
import com.launchdarkly.okhttp.eventsource.EventHandler; // ← 3.x package (with okhttp)
import com.launchdarkly.okhttp.eventsource.EventSource; // ← 3.x package (with okhttp)
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

/**
* Streams Wikimedia RecentChange SSE and forwards lines to a Kafka topic.
* Requires:
* - Kafka broker at localhost:9092
* - Dependency: com.launchdarkly:okhttp-eventsource:3.x (classic EventHandler API)
* - Send a proper User-Agent header to avoid 403 from Wikimedia
*/
public class WikimediaChangesProducer {
public static void main(String[] args) throws InterruptedException {

// create Producer Properties
// Kafka producer config
final String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// create the Producer
// Create the producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

String topic = "wikimedia.recentchange";
// Target topic
final String topic = "wikimedia.recentchange";

// Wikimedia requires a meaningful User-Agent
Headers requestHeaders = new Headers.Builder()
.add("User-Agent", "user-wiki-stream/1.0 ([email protected])")
.build();

// Create event handler to send SSE messages to Kafka
OkHttpClient client = new OkHttpClient();
EventHandler eventHandler = new WikimediaChangeHandler(producer, topic);
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();

EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url))
.client(client)
.headers(requestHeaders) // 3.x expects a single Headers object
.reconnectTime(Duration.ofSeconds(3));

// start the producer in another thread
EventSource eventSource = builder.build();

// Start the SSE client in another thread
eventSource.start();

// we produce for 10 minutes and block the program until then
// Stream for 10 minutes
TimeUnit.MINUTES.sleep(10);


// Graceful shutdown
eventSource.close();
producer.flush();
producer.close();
}
}