Skip to content

Commit

Permalink
Add broadcast state case
Browse files Browse the repository at this point in the history
  • Loading branch information
acanba authored Jul 21, 2020
1 parent 7fe504d commit b008c07
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,42 @@ The processed events are sent back to a Kafka topic (*SINK_TOPIC*), consumed by
LOCATION "/processed_events"
```
where *LOCATION* path is the HDFS location where processed events are stored.

## Fifth one (Flink broadcast state)

There is IoT device sending messages to Kafka in Avro type with the value of some metric that has to be within some limits:

```avro
{
"namespace": "net.zylklab.flink.sandbox.broadcaststate.pojo",
"type": "record",
"name": "Event",
"fields":
[
{"name": "var_id", "type": ["null","string"], "default": null},
{"name": "var_name", "type": ["null","string"], "default": null},
{"name": "value", "type": ["null","double"], "default": null},
{"name": "ts", "type": ["null","long"], "default": null}
]
}
```

These value's max and min limit could change over time so there is another Kafka topic where limit changes are published also in Avro type:

```avro
{
"namespace": "net.zylklab.flink.sandbox.broadcaststate.pojo",
"type": "record",
"name": "Limit",
"fields":
[
{"name": "var_id", "type": ["null","string"], "default": null},
{"name": "var_name", "type": ["null","string"], "default": null},
{"name": "max_limit", "type": ["null","double"], "default": null},
{"name": "min_limit", "type": ["null","double"], "default": null}
]
}
```

The Flink job consumes from both topics and process events comparing its values with its limits. When a new limit is published to Kafka, the job updates the limit without the need of restarting the job.

0 comments on commit b008c07

Please sign in to comment.