Skip to content

Commit

Permalink
Documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
gfg001 committed Oct 6, 2019
1 parent e4be1bb commit f4d53e1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 10 deletions.
56 changes: 48 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private static final int DELTA_LIMIT = 30;

## Second one (EventTime)

There is a IoT device counting the numbers of events in a zone (for example the number of bicycles crossing a point). These events are sent to a queue, serialized as avro type events. This is simulate as events send as a text to a localhost socket. To start the socket run this command
There is a IoT device counting the numbers of events in a zone (for example the number of bicycles crossing a point). These events are sent to a queue, serialized as avro type events. This is simulated as events send as a text to a localhost socket. To start the socket run this command

```bash
nc -lk 9999
Expand Down Expand Up @@ -58,10 +58,50 @@ private static final long MAX_OUT_OF_ORDERNESS_MS = 3500l;
* **ALLOWED_LATENESS_TIME** the time that an already processed window can be reprocessed if an event belonging to this window arrives
* **MAX_OUT_OF_ORDERNESS_MS** the maximum time it takes for a window to launch for the first time, allows waiting for messy events to arrive without getting lost

### Results


|window type| window size | window slide | window allow lateness | max out of orderness | number of events| file |
|-----------|-------------|-------- -----|-----------------------|----------------------|-----------------|------|
|Normal| 5 seconds | - | 0 seconds | 0 ms | 5 windows with 5 events per window | orderer messages|

## Third one (NiFi + Kafka + Flink + Kerberos)

There is a IoT device counting the numbers of events in a zone (for example the number of bicycles crossing a point). These events are sent to a queue, serialized as avro type events. This is simulated as events send to a kafka topic from nifi flow. Everything use kerberos and needs a HDP/HDF cluster

* geohasevents-nifi-template.xml -> The NiFi template, generate avro records. The avro schema is in the hwx-SchemaRegistry
* geohashevent-pojo.avsc -> the avro schema
```avro
{
"namespace": "net.zylklab.flink.sandbox.cep_examples.pojo.auto.avro.pojo",
"type": "record",
"name": "GeoHashEventAvro",
"fields":
[
{"name": "geohash", "type": ["null","string"], "default": null},
{"name": "totalGPRSEvents", "type": ["null","int"], "default": null},
{"name": "timestamp", "type": ["null","long"], "default": null}
]
}
```
* jaas-client.conf -> the kerberos config to use from flink (kafka principal, and schemaregistry principal)

The job is
```java
net.zylklab.flink.sandbox.kafka_example.job.EventTimeWindowGeoHashSubFromKafkaJob
```

where

```java
private static final String KAFKA_CONSUMER_GROUP = "consumer-flink";
private static final String KAFKA_BROKER = "enbarr001.bigdata.zylk.net:6667,enbarr002.bigdata.zylk.net:6667";
private static final String KAFKA_PROTOCOL = "SASL_PLAINTEXT";
private static final String KAFKA_TOPIC = "GEOHASH_EVENTS_AVRO";
private static final String KAFKA_KERBEROS_SERVICE_NAME = "kafka";
private static final String KAFKA_OFFSET = "earliest";
private static final String HWX_SCHEMA_REGISTRY = "http://enbarr001.bigdata.zylk.net:7788/api/v1";
```

* **KAFKA_CONSUMER_GROUP** the consumer group name
* **KAFKA_BROKER** the kafka broker list
* **KAFKA_PROTOCOL** the kafka protocol SASL_PLAINTEXT
* **KAFKA_TOPIC** the topic name
* **KAFKA_KERBEROS_SERVICE_NAME** the kerberos service name
* **HWX_SCHEMA_REGISTRY** the hwx schemaregistry rest endpoint



Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class EventTimeWindowGeoHashSubFromKafkaJob {
private static final String KAFKA_TOPIC = "GEOHASH_EVENTS_AVRO";
private static final String KAFKA_KERBEROS_SERVICE_NAME = "kafka";
private static final String KAFKA_OFFSET = "earliest";

private static final String HWX_SCHEMA_REGISTRY = "http://enbarr001.bigdata.zylk.net:7788/api/v1";


private static final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
Expand All @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://enbarr001.bigdata.zylk.net:7788/api/v1");
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, HWX_SCHEMA_REGISTRY);


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down

0 comments on commit f4d53e1

Please sign in to comment.