diff --git a/README.md b/README.md index f4ec9bc..9a35d5d 100644 --- a/README.md +++ b/README.md @@ -188,7 +188,7 @@ There is a IoT device counting the numbers of different events in a zone (for ex * **WINDOW_TIME_SIZE** the size (time) of the window * **ALLOWED_LATENESS_TIME** the time that an already processed window can be reprocessed if an event belonging to this window arraives * **MAX_OUT_OF_ORDERNESS_MS** the maximum time it takes for a window to launch for the first time, allows waiting for messy events to arrive without getting lost -* WATERMARK_INTERVAL_MS the interval (time) in which the watermark will be updated +* **WATERMARK_INTERVAL_MS** the interval (time) in which the watermark will be updated * **BOOTSTRAP_SERVERS** the kafka broker list * **GROUP_ID** the consumer group name * **SOURCE_TOPIC** the raw events kafka topic @@ -217,17 +217,17 @@ The processed events are sent back to a Kafka topic (*SINK_TOPIC*), consumed by LOCATION "/processed_events" ``` where *LOCATION* path is the HDFS location where processed events are stored. - + ## Fifth one (Flink broadcast state) - + There is IoT device sending messages to Kafka in Avro type with the value of some metric that has to be within some limits: - + ```avro { "namespace": "net.zylklab.flink.sandbox.broadcaststate.pojo", "type": "record", "name": "Event", - "fields": + "fields": [ {"name": "var_id", "type": ["null","string"], "default": null}, {"name": "var_name", "type": ["null","string"], "default": null}, @@ -244,7 +244,7 @@ These value's max and min limit could change over time so there is another Kafka "namespace": "net.zylklab.flink.sandbox.broadcaststate.pojo", "type": "record", "name": "Limit", - "fields": + "fields": [ {"name": "var_id", "type": ["null","string"], "default": null}, {"name": "var_name", "type": ["null","string"], "default": null}, @@ -256,3 +256,22 @@ These value's max and min limit could change over time so there is another Kafka The Flink job consumes from both topics and process events comparing its values with its limits. When a new limit is published to Kafka, the job updates the limit without the need of restarting the job. +## Order an unordered stream of events + +There is a IoT device counting the numbers of different events in a zone (for example the number of cars, bicycles and motorbikes crossing a point). These events are sent to a Kafka topic, serialized as Avro type events. This is simulated as events sent to a Kafka topic from NiFi flow. These events are not produced in an orderly fashion and, in varous processes this can be a problem. + +The class `net.zylklab.flink.sandbox.unordered_events.UnorderedEventsJob` defines a Flink job that consumes unordered data from a Kafka topic, arrange it and it sent the ordered data back to a Kafka topic. In order to do that, this class uses the `net.zylklab.flink.sandbox.unordered_events.BufferedKeyedProcessFunction` class. + +The main parameters to take in count are located in `net.zylklab.flink.sandbox.unordered_events.UnorderedEventsJob`: + +```java +private static final Integer WATERMARK_INTERVAL_MS = 500; +private static final Integer MAX_OUT_OF_ORDERNESS_MS = 1000; +private static final Integer MAX_WAIT_FOR_EVENTS_SEC = 60; +``` + +where + +- **WATERMARK_INTERVAL_MS**: the interval (time) in which the watermark will be updated. +- **MAX_OUT_OF_ORDERNESS_MS**: the maximum time it takes for a watermark to launch for the first time, allows waiting for messy events to arrive without getting lost. +- **MAX_WAIT_FOR_EVENTS_SEC**: When consuming from a Kafka topic with more than one partition, it can happen that a partition stops sending events (this usually happens when topic is partitioned by key). This _MAX_WAIT_FOR_EVENTS_SEC_ defines the time that the job will wait for new data from each partition. diff --git a/pom.xml b/pom.xml index f697705..b281ac9 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,8 @@ UTF-8 - 1.9.1 + + 1.11.1 2.11 1.7.7 1.2.17 @@ -52,6 +53,13 @@ ${flink.version} + + + org.apache.flink + flink-clients_2.11 + ${flink.version} + + org.slf4j diff --git a/src/main/java/net/zylklab/flink/sandbox/unordered_events/BufferedKeyedProcessFunction.java b/src/main/java/net/zylklab/flink/sandbox/unordered_events/BufferedKeyedProcessFunction.java new file mode 100644 index 0000000..ee0c096 --- /dev/null +++ b/src/main/java/net/zylklab/flink/sandbox/unordered_events/BufferedKeyedProcessFunction.java @@ -0,0 +1,59 @@ +package net.zylklab.flink.sandbox.unordered_events; + +import java.util.HashMap; + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.zylklab.flink.sandbox.broadcaststate.pojo.Event; + +/** + * This Class provides an implementation on top of Flink's KeyedProcessFunction that acts as a buffer for ordering unordered events + */ +public class BufferedKeyedProcessFunction extends KeyedProcessFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger _log = LoggerFactory.getLogger(BufferedKeyedProcessFunction.class); + + private HashMap eventsMap = new HashMap(); + + @Override + public void processElement( + Event in, + KeyedProcessFunction.Context ctx, + Collector out) + throws Exception { + + Long ts = in.getTs(); + if (ctx.timerService().currentWatermark() < ts) { + // Put event in the map keyed by its timestamp + eventsMap.put(ts, in); + + // Register an Event Timer to be triggered when the watermark reaches this timestamp + ctx.timerService().registerEventTimeTimer(ts); + } else { + _log.warn("Current watermark has already passed this event!"); + } + } + + @Override + public void onTimer( + long timestamp, + KeyedProcessFunction.OnTimerContext ctx, + Collector out) + throws Exception { + + // Emit event with this timestamp + if (eventsMap.containsKey(timestamp)) { + out.collect(eventsMap.get(timestamp)); + eventsMap.remove(timestamp); + } else { + _log.info("onTimer triggered but no value set for timestamp " + timestamp); + } + } + + +} diff --git a/src/main/java/net/zylklab/flink/sandbox/unordered_events/UnorderedEventsJob.java b/src/main/java/net/zylklab/flink/sandbox/unordered_events/UnorderedEventsJob.java new file mode 100644 index 0000000..ed0ad34 --- /dev/null +++ b/src/main/java/net/zylklab/flink/sandbox/unordered_events/UnorderedEventsJob.java @@ -0,0 +1,81 @@ +package net.zylklab.flink.sandbox.unordered_events; + +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.zylklab.flink.sandbox.broadcaststate.pojo.Event; +import net.zylklab.flink.sandbox.cep_examples.util.avro.AvroDeserializationSchema; +import net.zylklab.flink.sandbox.cep_examples.util.avro.AvroSerializationSchema; + + +public class UnorderedEventsJob { + + private static final Logger _log = LoggerFactory.getLogger(UnorderedEventsJob.class); + + private static final Integer PARALLELISM = 2; + private static final Integer WATERMARK_INTERVAL_MS = 500; + private static final Integer MAX_OUT_OF_ORDERNESS_MS = 1000; + private static final Integer MAX_WAIT_FOR_EVENTS_SEC = 60; + + private static final String BOOTSTRAP_SERVERS = "amaterasu001.bigdata.zylk.net:6667, amaterasu002.bigdata.zylk.net:6667"; + private static final String GROUP_ID = "flink_unordered"; + private static final String SOURCE_TOPIC = "UNORDERED_EVENTS"; + private static final String SINK_TOPIC = "ORDERED_EVENTS"; + + public static void main(String[] args) throws Exception { + _log.debug("Starting application"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.getConfig().setAutoWatermarkInterval(WATERMARK_INTERVAL_MS); + + env.setParallelism(PARALLELISM); + + _log.debug("Environment created."); + UnorderedEventsJob w = new UnorderedEventsJob(); + w.addJob(env); + env.execute("EventTimeWindowGroupAndProcessSubJob"); + + } + + private void addJob(StreamExecutionEnvironment env) { + + // Kafka consumer properties + Properties props = new Properties(); + props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS); + props.setProperty("group.id", GROUP_ID); + + FlinkKafkaConsumerBase kafkaSource = new FlinkKafkaConsumer<>(SOURCE_TOPIC, new AvroDeserializationSchema<>(Event.class), props) + .assignTimestampsAndWatermarks(WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofMillis(MAX_OUT_OF_ORDERNESS_MS)) + .withTimestampAssigner((event, ts) -> event.getTs()) // Timestamp extractor + .withIdleness(Duration.ofSeconds(MAX_WAIT_FOR_EVENTS_SEC)) // Wait for a partition when it stop sending events + ); + + FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(SINK_TOPIC, new AvroSerializationSchema<>(Event.class), props); + + DataStreamSource stream = env.addSource(kafkaSource); + + stream + .keyBy(event -> event.getVarId()) + .process(new BufferedKeyedProcessFunction()) + .addSink(kafkaSink); + + } +}