Skip to content

Commit c8186c1

Browse files
authored
Merge pull request #58 from nomisRev/wip-consumer-loop
Kafka Consumer Event Loop
2 parents 8a239d2 + b8b45be commit c8186c1

29 files changed

+1885
-205
lines changed

README.md

Lines changed: 60 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ Module kotlin-kafka
22

33
[![Maven Central](https://img.shields.io/maven-central/v/io.github.nomisrev/kotlin-kafka?color=4caf50&label=latest%20release)](https://maven-badges.herokuapp.com/maven-central/io.github.nomisrev/kotlin-kafka)
44

5-
<!--- TEST_NAME ReadmeTest -->
65
<!--- TOC -->
76

87
* [Rationale](#rationale)
@@ -12,22 +11,31 @@ Module kotlin-kafka
1211

1312
<!--- END -->
1413

15-
This project is still under development, andd started as a playground where I was playing around with Kafka in Kotlin
16-
and the Kafka SDK whilst reading the Kafka book Definite Guide from Confluent.
17-
https://www.confluent.io/resources/kafka-the-definitive-guide-v2/
18-
1914
## Rationale
2015

21-
At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension. These
22-
operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised
23-
runtimes.
16+
At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension, or KotlinX Coroutines Flow.
17+
These operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised runtimes.
18+
19+
Some important aspects of Kafka are tricky to implement with the "low-level" Kafka API,
20+
especially properly streaming records from Kafka and correctly committing them.
21+
Additional complexity is involved in this process, more details [here](https://tuleism.github.io/blog/2021/parallel-backpressured-kafka-consumer/).
22+
23+
To solve these problems a couple of projects in the JVM already exist:
24+
- [Alpakka Kafka](https://github.com/akka/alpakka-kafka)
25+
- [reactor-kafka](https://github.com/reactor/reactor-kafka)
26+
27+
There was no implementation for KotlinX Coroutines Flow,
28+
you can however quite easily use reactor-kafka with [KotlinX Coroutines Reactor bindings](https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-reactor/README.md).
29+
30+
This project implements the same strategies as [reactor-kafka] directly on top of KotlinX Coroutines to benefit from **all** their benefits,
31+
and to open the door to potentially becoming a Kotlin MPP library in the future.
2432

2533
## Goals
2634

27-
- Lean Core library built on top of Kotlin Std & KotlinX Coroutines (possible extensions with Arrow in additional
28-
module)
35+
- Lean Core library built on top of Kotlin Std & KotlinX Coroutines
2936
- Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and `suspend`.
3037
- Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
38+
- Strong guarantees about committing record offsets, and performance optimisations in regard to re-balancing/partitioning.
3139
- example for testing Kafka with Test Containers in Kotlin.
3240

3341
## Adding Dependency
@@ -43,20 +51,24 @@ dependencies {
4351
## Example
4452

4553
<!--- INCLUDE
46-
import java.util.UUID
47-
import kotlinx.coroutines.Dispatchers.Default
54+
import io.github.nomisRev.kafka.receiver.KafkaReceiver
55+
import io.github.nomisRev.kafka.receiver.ReceiverSettings
56+
import kotlinx.coroutines.Dispatchers
4857
import kotlinx.coroutines.coroutineScope
58+
import kotlinx.coroutines.delay
59+
import kotlinx.coroutines.flow.Flow
4960
import kotlinx.coroutines.flow.asFlow
50-
import kotlinx.coroutines.flow.collect
5161
import kotlinx.coroutines.flow.map
5262
import kotlinx.coroutines.flow.take
5363
import kotlinx.coroutines.launch
64+
import kotlinx.coroutines.runBlocking
5465
import org.apache.kafka.clients.admin.NewTopic
5566
import org.apache.kafka.clients.producer.ProducerRecord
5667
import org.apache.kafka.common.serialization.IntegerDeserializer
5768
import org.apache.kafka.common.serialization.IntegerSerializer
5869
import org.apache.kafka.common.serialization.StringDeserializer
5970
import org.apache.kafka.common.serialization.StringSerializer
71+
import java.util.UUID
6072
-->
6173

6274
```kotlin
@@ -66,49 +78,46 @@ value class Key(val index: Int)
6678
@JvmInline
6779
value class Message(val content: String)
6880

69-
fun main(): Unit =
70-
runBlocking(Default) {
71-
val topicName = "test-topic"
72-
val msgCount = 10
73-
val kafka = Kafka.container
81+
fun main(): Unit = runBlocking(Dispatchers.Default) {
82+
val topicName = "test-topic"
83+
val msgCount = 10
84+
val kafka = Kafka.container
85+
86+
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
87+
client.createTopic(NewTopic(topicName, 1, 1))
88+
}
7489

75-
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
76-
client.createTopic(NewTopic(topicName, 1, 1))
90+
coroutineScope { // Run produces and consumer in a single scope
91+
launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
92+
val settings: ProducerSettings<Key, Message> = ProducerSettings(
93+
kafka.bootstrapServers,
94+
IntegerSerializer().imap { key: Key -> key.index },
95+
StringSerializer().imap { msg: Message -> msg.content },
96+
Acks.All
97+
)
98+
(1..msgCount)
99+
.asFlow()
100+
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
101+
.produce(settings)
102+
.collect(::println)
77103
}
78104

79-
coroutineScope { // Run produces and consumer in a single scope
80-
launch { // Send 20 messages, and then close the producer
81-
val settings: ProducerSettings<Key, Message> =
82-
ProducerSettings(
83-
kafka.bootstrapServers,
84-
IntegerSerializer().imap { key: Key -> key.index },
85-
StringSerializer().imap { msg: Message -> msg.content },
86-
Acks.All
87-
)
88-
(1..msgCount)
89-
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
90-
.asFlow()
91-
.produce(settings)
92-
.collect(::println)
93-
}
94-
95-
launch { // Consume 20 messages as a stream, and then close the consumer
96-
val settings: ConsumerSettings<Key, Message> =
97-
ConsumerSettings(
98-
kafka.bootstrapServers,
99-
IntegerDeserializer().map(::Key),
100-
StringDeserializer().map(::Message),
101-
groupId = UUID.randomUUID().toString(),
102-
autoOffsetReset = AutoOffsetReset.Earliest
103-
)
104-
kafkaConsumer(settings)
105-
.subscribeTo(topicName)
106-
.take(msgCount)
107-
.map { "${it.key()} -> ${it.value()}" }
108-
.collect(::println)
109-
}
105+
launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
106+
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
107+
kafka.bootstrapServers,
108+
IntegerDeserializer().map(::Key),
109+
StringDeserializer().map(::Message),
110+
groupId = UUID.randomUUID().toString(),
111+
autoOffsetReset = AutoOffsetReset.Earliest
112+
)
113+
KafkaReceiver(settings)
114+
.receive(topicName)
115+
.take(msgCount)
116+
.map { "${it.key()} -> ${it.value()}" }
117+
.collect(::println)
110118
}
111119
}
120+
}
112121
```
113122

114123
> You can get the full code [here](guide/example/example-readme-01.kt).
@@ -135,5 +144,3 @@ Key(index=8) -> Message(content=msg: 8)
135144
Key(index=9) -> Message(content=msg: 9)
136145
Key(index=10) -> Message(content=msg: 10)
137146
```
138-
139-
<!--- TEST -->

build.gradle.kts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ dependencies {
3434
api(libs.kotlinx.coroutines.core)
3535
api(libs.kotlinx.coroutines.jdk8)
3636
api(libs.kafka.clients)
37-
37+
implementation(libs.slf4j.api)
38+
3839
testImplementation(libs.bundles.kotest)
40+
testImplementation(libs.testcontainers.kafka)
41+
testImplementation(libs.slf4j.simple)
3942
}
4043

4144
configure<KnitPluginExtension> {
@@ -72,7 +75,7 @@ tasks {
7275
kotlinOptions.jvmTarget = "1.8"
7376
}
7477

75-
val cleanDocs = register<Delete>("cleanDocs") {
78+
register<Delete>("cleanDocs") {
7679
val folder = file("docs").also { it.mkdir() }
7780
val docsContent = folder.listFiles().filter { it != folder }
7881
delete(docsContent)

guide/example/example-readme-01.kt

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,67 +4,68 @@ import io.github.nomisRev.kafka.*
44
import java.util.Properties
55
import kotlinx.coroutines.runBlocking
66

7-
import java.util.UUID
8-
import kotlinx.coroutines.Dispatchers.Default
7+
import io.github.nomisRev.kafka.receiver.KafkaReceiver
8+
import io.github.nomisRev.kafka.receiver.ReceiverSettings
9+
import kotlinx.coroutines.Dispatchers
910
import kotlinx.coroutines.coroutineScope
11+
import kotlinx.coroutines.delay
12+
import kotlinx.coroutines.flow.Flow
1013
import kotlinx.coroutines.flow.asFlow
11-
import kotlinx.coroutines.flow.collect
1214
import kotlinx.coroutines.flow.map
1315
import kotlinx.coroutines.flow.take
1416
import kotlinx.coroutines.launch
17+
import kotlinx.coroutines.runBlocking
1518
import org.apache.kafka.clients.admin.NewTopic
1619
import org.apache.kafka.clients.producer.ProducerRecord
1720
import org.apache.kafka.common.serialization.IntegerDeserializer
1821
import org.apache.kafka.common.serialization.IntegerSerializer
1922
import org.apache.kafka.common.serialization.StringDeserializer
2023
import org.apache.kafka.common.serialization.StringSerializer
24+
import java.util.UUID
2125

2226
@JvmInline
2327
value class Key(val index: Int)
2428

2529
@JvmInline
2630
value class Message(val content: String)
2731

28-
fun main(): Unit =
29-
runBlocking(Default) {
30-
val topicName = "test-topic"
31-
val msgCount = 10
32-
val kafka = Kafka.container
32+
fun main(): Unit = runBlocking(Dispatchers.Default) {
33+
val topicName = "test-topic"
34+
val msgCount = 10
35+
val kafka = Kafka.container
3336

34-
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
35-
client.createTopic(NewTopic(topicName, 1, 1))
36-
}
37+
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
38+
client.createTopic(NewTopic(topicName, 1, 1))
39+
}
3740

38-
coroutineScope { // Run produces and consumer in a single scope
39-
launch { // Send 20 messages, and then close the producer
40-
val settings: ProducerSettings<Key, Message> =
41-
ProducerSettings(
42-
kafka.bootstrapServers,
43-
IntegerSerializer().imap { key: Key -> key.index },
44-
StringSerializer().imap { msg: Message -> msg.content },
45-
Acks.All
46-
)
47-
(1..msgCount)
48-
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
49-
.asFlow()
50-
.produce(settings)
51-
.collect(::println)
52-
}
41+
coroutineScope { // Run produces and consumer in a single scope
42+
launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
43+
val settings: ProducerSettings<Key, Message> = ProducerSettings(
44+
kafka.bootstrapServers,
45+
IntegerSerializer().imap { key: Key -> key.index },
46+
StringSerializer().imap { msg: Message -> msg.content },
47+
Acks.All
48+
)
49+
(1..msgCount)
50+
.asFlow()
51+
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
52+
.produce(settings)
53+
.collect(::println)
54+
}
5355

54-
launch { // Consume 20 messages as a stream, and then close the consumer
55-
val settings: ConsumerSettings<Key, Message> =
56-
ConsumerSettings(
57-
kafka.bootstrapServers,
58-
IntegerDeserializer().map(::Key),
59-
StringDeserializer().map(::Message),
60-
groupId = UUID.randomUUID().toString(),
61-
autoOffsetReset = AutoOffsetReset.Earliest
62-
)
63-
kafkaConsumer(settings)
64-
.subscribeTo(topicName)
65-
.take(msgCount)
66-
.map { "${it.key()} -> ${it.value()}" }
67-
.collect(::println)
68-
}
56+
launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
57+
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
58+
kafka.bootstrapServers,
59+
IntegerDeserializer().map(::Key),
60+
StringDeserializer().map(::Message),
61+
groupId = UUID.randomUUID().toString(),
62+
autoOffsetReset = AutoOffsetReset.Earliest
63+
)
64+
KafkaReceiver(settings)
65+
.receive(topicName)
66+
.take(msgCount)
67+
.map { "${it.key()} -> ${it.value()}" }
68+
.collect(::println)
6969
}
7070
}
71+
}

guide/src/main/kotlin/main.kt

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package io.github.nomisRev.kafka
22

3+
import io.github.nomisRev.kafka.receiver.KafkaReceiver
4+
import io.github.nomisRev.kafka.receiver.ReceiverSettings
35
import kotlinx.coroutines.Dispatchers
46
import kotlinx.coroutines.coroutineScope
7+
import kotlinx.coroutines.delay
58
import kotlinx.coroutines.flow.Flow
69
import kotlinx.coroutines.flow.asFlow
7-
import kotlinx.coroutines.flow.collect
810
import kotlinx.coroutines.flow.map
911
import kotlinx.coroutines.flow.take
1012
import kotlinx.coroutines.launch
@@ -16,7 +18,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer
1618
import org.apache.kafka.common.serialization.StringDeserializer
1719
import org.apache.kafka.common.serialization.StringSerializer
1820
import java.util.UUID
19-
import kotlin.time.Duration.Companion.milliseconds
2021

2122
@JvmInline
2223
value class Key(val index: Int)
@@ -26,7 +27,7 @@ value class Message(val content: String)
2627

2728
fun main(): Unit = runBlocking(Dispatchers.Default) {
2829
val topicName = "test-topic"
29-
val msgCount = 10
30+
val msgCount = 25
3031
val kafka = Kafka.container
3132

3233
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
@@ -42,31 +43,31 @@ fun main(): Unit = runBlocking(Dispatchers.Default) {
4243
Acks.All
4344
)
4445
(1..msgCount)
45-
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
4646
.asFlow()
47+
.map { index ->
48+
delay(index * 50L)
49+
ProducerRecord(topicName, Key(index), Message("msg: $index"))
50+
}
4751
.produce(settings)
4852
.collect(::println)
4953
}
5054

5155
launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
52-
val settings: ConsumerSettings<Key, Message> = ConsumerSettings(
56+
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
5357
kafka.bootstrapServers,
5458
IntegerDeserializer().map(::Key),
5559
StringDeserializer().map(::Message),
5660
groupId = UUID.randomUUID().toString(),
57-
autoOffsetReset = AutoOffsetReset.Earliest,
58-
enableAutoCommit = false
61+
autoOffsetReset = AutoOffsetReset.Earliest
5962
)
60-
61-
KafkaConsumer(settings).asFlow()
62-
.subscribeTo(topicName)
63-
.tap { (key, value) -> println("$key -> $value") }
64-
.commitBatchWithin(settings, 3, 10.milliseconds)
65-
.take(4)
66-
.collect()
63+
KafkaReceiver(settings)
64+
.receive(topicName)
65+
.take(msgCount)
66+
.collect {
67+
delay(75)
68+
println("${Thread.currentThread().name} => ${it.key()} -> ${it.value()}")
69+
it.offset.acknowledge()
70+
}
6771
}
6872
}
6973
}
70-
71-
fun <A> Flow<A>.tap(also: suspend (A) -> Unit): Flow<A> =
72-
map { it.also { also(it) } }

0 commit comments

Comments
 (0)