Skip to content

adamko-dev/kotka-streams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

905d66e · Feb 24, 2024

History

94 Commits
Feb 24, 2024
Dec 22, 2023
Mar 17, 2023
Jan 25, 2024
Mar 20, 2023
Mar 13, 2023
Jan 15, 2022
Jan 15, 2022
Mar 20, 2023
Dec 21, 2023
Dec 21, 2023
Dec 21, 2023
Jan 1, 2023
Jul 17, 2022
Dec 21, 2023

Repository files navigation

GitHub license Maven Central Maven Central Snapshots

Kotka Streams - Kotlin for Kafka Streams

Using Kotka means a more pleasant experience while using Kafka Streams.

Quickstart

Add a dependency on kotka-streams-extensions for the basics.

// build.gradle.kts
repositories {
  mavenCentral()
}

dependencies {
  implementation("dev.adamko.kotka:kotka-streams-extensions:$kotkaVersion")
}

Modules

There are three modules. Add a dependency on com.github.adamko-dev:kotka-streams to get them all at once

dependencies {
  implementation("dev.adamko.kotka:kotka-streams:$kotkaVersion")
}

kotka-streams-extensions

Contains the basic extension functions to make Kafka Streams more Kotlin-esque.

  implementation("dev.adamko.kotka:kotka-streams-extensions:$kotkaVersion")
import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*

data class MusicalBand(
  val name: String,
  val memberNames: List<String>,
)

builder.stream<String, MusicalBand>("musical-bands")
  .flatMap("band-member-names-to-band-name") { _: String, band: MusicalBand ->
    band.memberNames.map { memberName -> memberName to band.name }
  }
  .groupByKey(groupedAs("map-of-band-member-to-band-names"))

kotka-streams-framework

A light framework for structuring topics and records.

  implementation("dev.adamko.kotka:kotka-streams-framework:$kotkaVersion")

Use TopicRecord to standardise the data on each topic. Records can now easily be converted from one type, to another.

import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*
import dev.adamko.kotka.topicdata.*

data class Animal(
  val id: Long,
  val name: String,
) : TopicRecord<Long> {
  override val topicKey: Long by ::id
}

data class Pet(
  val id: Long,
  val name: String,
) : TopicRecord<Long> {
  override val topicKey: Long by ::id
}

val petUpdates = builder.stream<Long, Animal>("animals")
  .mapTopicRecords("convert-animals-to-pets") { _, animal ->
    Pet(animal.id, animal.name)
  }

Use KeyValueSerdes<K, V> to define both the key and value serdes for a topic. A TopicDefinition<K, V> ties both of these together.

/** All [Pet] updates */
object PetUpdatesTopic : TopicDefinition<Long, Animal> {
  override val topicName = "pet-updates"
  override val serdes = KeyValueSerdes(Serdes.Long(), PetSerde())
}

petUpdates
  .to(
    PetUpdatesTopic.topicName,
    PetUpdatesTopic.serdes.producer("send-pet-updates-to-pet-update-topic")
  )

kotka-streams-kotlinx-serialization

Use Kotlinx Serialization for topic key/value serdes.

implementation("dev.adamko.kotka:kotka-streams-kotlinx-serialization:$kotkaVersion")
import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*
import dev.adamko.kotka.topicdata.*
import dev.adamko.kotka.kxs.*

val jsonMapper = Json {}

@Serializable
data class Sku(
  val sku: String
)

@Serializable
data class ShopItem(
  val id: Sku,
  val name: String,
) : TopicRecord<Sku> {
  override val topicKey: Sku by ::id
}

object ShopItemTopic : TopicDefinition<Long, ShopItem> {
  override val topicName = "shop-item-updates"
  override val serdes = KeyValueSerdes.kxsJson(jsonMapper)
}