Skip to content

Commit

Permalink
add Unordered job
Browse files Browse the repository at this point in the history
  • Loading branch information
[zylk] Aian Cantabrana committed Aug 5, 2020
1 parent b008c07 commit 95625fc
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 7 deletions.
31 changes: 25 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ There is a IoT device counting the numbers of different events in a zone (for ex
* **WINDOW_TIME_SIZE** the size (time) of the window
* **ALLOWED_LATENESS_TIME** the time that an already processed window can be reprocessed if an event belonging to this window arraives
* **MAX_OUT_OF_ORDERNESS_MS** the maximum time it takes for a window to launch for the first time, allows waiting for messy events to arrive without getting lost
* WATERMARK_INTERVAL_MS the interval (time) in which the watermark will be updated
* **WATERMARK_INTERVAL_MS** the interval (time) in which the watermark will be updated
* **BOOTSTRAP_SERVERS** the kafka broker list
* **GROUP_ID** the consumer group name
* **SOURCE_TOPIC** the raw events kafka topic
Expand Down Expand Up @@ -217,17 +217,17 @@ The processed events are sent back to a Kafka topic (*SINK_TOPIC*), consumed by
LOCATION "/processed_events"
```
where *LOCATION* path is the HDFS location where processed events are stored.

## Fifth one (Flink broadcast state)

There is IoT device sending messages to Kafka in Avro type with the value of some metric that has to be within some limits:

```avro
{
"namespace": "net.zylklab.flink.sandbox.broadcaststate.pojo",
"type": "record",
"name": "Event",
"fields":
"fields":
[
{"name": "var_id", "type": ["null","string"], "default": null},
{"name": "var_name", "type": ["null","string"], "default": null},
Expand All @@ -244,7 +244,7 @@ These value's max and min limit could change over time so there is another Kafka
"namespace": "net.zylklab.flink.sandbox.broadcaststate.pojo",
"type": "record",
"name": "Limit",
"fields":
"fields":
[
{"name": "var_id", "type": ["null","string"], "default": null},
{"name": "var_name", "type": ["null","string"], "default": null},
Expand All @@ -256,3 +256,22 @@ These value's max and min limit could change over time so there is another Kafka

The Flink job consumes from both topics and process events comparing its values with its limits. When a new limit is published to Kafka, the job updates the limit without the need of restarting the job.

## Order an unordered stream of events

There is a IoT device counting the numbers of different events in a zone (for example the number of cars, bicycles and motorbikes crossing a point). These events are sent to a Kafka topic, serialized as Avro type events. This is simulated as events sent to a Kafka topic from NiFi flow. These events are not produced in an orderly fashion and, in varous processes this can be a problem.

The class `net.zylklab.flink.sandbox.unordered_events.UnorderedEventsJob` defines a Flink job that consumes unordered data from a Kafka topic, arrange it and it sent the ordered data back to a Kafka topic. In order to do that, this class uses the `net.zylklab.flink.sandbox.unordered_events.BufferedKeyedProcessFunction` class.

The main parameters to take in count are located in `net.zylklab.flink.sandbox.unordered_events.UnorderedEventsJob`:

```java
private static final Integer WATERMARK_INTERVAL_MS = 500;
private static final Integer MAX_OUT_OF_ORDERNESS_MS = 1000;
private static final Integer MAX_WAIT_FOR_EVENTS_SEC = 60;
```

where

- **WATERMARK_INTERVAL_MS**: the interval (time) in which the watermark will be updated.
- **MAX_OUT_OF_ORDERNESS_MS**: the maximum time it takes for a watermark to launch for the first time, allows waiting for messy events to arrive without getting lost.
- **MAX_WAIT_FOR_EVENTS_SEC**: When consuming from a Kafka topic with more than one partition, it can happen that a partition stops sending events (this usually happens when topic is partitioned by key). This _MAX_WAIT_FOR_EVENTS_SEC_ defines the time that the job will wait for new data from each partition.
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.1</flink.version>
<!-- <flink.version>1.9.1</flink.version> -->
<flink.version>1.11.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
Expand Down Expand Up @@ -52,6 +53,13 @@
<version>${flink.version}</version>
</dependency>

<!-- Needeed as from flink 1.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>


<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package net.zylklab.flink.sandbox.unordered_events;

import java.util.HashMap;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.zylklab.flink.sandbox.broadcaststate.pojo.Event;

/**
* This Class provides an implementation on top of Flink's KeyedProcessFunction that acts as a buffer for ordering unordered events
*/
public class BufferedKeyedProcessFunction extends KeyedProcessFunction<String, Event, Event>{

private static final long serialVersionUID = 1L;

private static final Logger _log = LoggerFactory.getLogger(BufferedKeyedProcessFunction.class);

private HashMap<Long, Event> eventsMap = new HashMap<Long, Event>();

@Override
public void processElement(
Event in,
KeyedProcessFunction<String, Event, Event>.Context ctx,
Collector<Event> out)
throws Exception {

Long ts = in.getTs();
if (ctx.timerService().currentWatermark() < ts) {
// Put event in the map keyed by its timestamp
eventsMap.put(ts, in);

// Register an Event Timer to be triggered when the watermark reaches this timestamp
ctx.timerService().registerEventTimeTimer(ts);
} else {
_log.warn("Current watermark has already passed this event!");
}
}

@Override
public void onTimer(
long timestamp,
KeyedProcessFunction<String, Event, Event>.OnTimerContext ctx,
Collector<Event> out)
throws Exception {

// Emit event with this timestamp
if (eventsMap.containsKey(timestamp)) {
out.collect(eventsMap.get(timestamp));
eventsMap.remove(timestamp);
} else {
_log.info("onTimer triggered but no value set for timestamp " + timestamp);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package net.zylklab.flink.sandbox.unordered_events;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.zylklab.flink.sandbox.broadcaststate.pojo.Event;
import net.zylklab.flink.sandbox.cep_examples.util.avro.AvroDeserializationSchema;
import net.zylklab.flink.sandbox.cep_examples.util.avro.AvroSerializationSchema;


public class UnorderedEventsJob {

private static final Logger _log = LoggerFactory.getLogger(UnorderedEventsJob.class);

private static final Integer PARALLELISM = 2;
private static final Integer WATERMARK_INTERVAL_MS = 500;
private static final Integer MAX_OUT_OF_ORDERNESS_MS = 1000;
private static final Integer MAX_WAIT_FOR_EVENTS_SEC = 60;

private static final String BOOTSTRAP_SERVERS = "amaterasu001.bigdata.zylk.net:6667, amaterasu002.bigdata.zylk.net:6667";
private static final String GROUP_ID = "flink_unordered";
private static final String SOURCE_TOPIC = "UNORDERED_EVENTS";
private static final String SINK_TOPIC = "ORDERED_EVENTS";

public static void main(String[] args) throws Exception {
_log.debug("Starting application");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(WATERMARK_INTERVAL_MS);

env.setParallelism(PARALLELISM);

_log.debug("Environment created.");
UnorderedEventsJob w = new UnorderedEventsJob();
w.addJob(env);
env.execute("EventTimeWindowGroupAndProcessSubJob");

}

private void addJob(StreamExecutionEnvironment env) {

// Kafka consumer properties
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
props.setProperty("group.id", GROUP_ID);

FlinkKafkaConsumerBase<Event> kafkaSource = new FlinkKafkaConsumer<>(SOURCE_TOPIC, new AvroDeserializationSchema<>(Event.class), props)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofMillis(MAX_OUT_OF_ORDERNESS_MS))
.withTimestampAssigner((event, ts) -> event.getTs()) // Timestamp extractor
.withIdleness(Duration.ofSeconds(MAX_WAIT_FOR_EVENTS_SEC)) // Wait for a partition when it stop sending events
);

FlinkKafkaProducer<Event> kafkaSink = new FlinkKafkaProducer<>(SINK_TOPIC, new AvroSerializationSchema<>(Event.class), props);

DataStreamSource<Event> stream = env.addSource(kafkaSource);

stream
.keyBy(event -> event.getVarId())
.process(new BufferedKeyedProcessFunction())
.addSink(kafkaSink);

}
}

0 comments on commit 95625fc

Please sign in to comment.