Skip to content

Latest commit

 

History

History
57 lines (43 loc) · 2.45 KB

README.md

File metadata and controls

57 lines (43 loc) · 2.45 KB

FS2 Kafka JsonSchema

Continuous Integration Maven Central

Provides FS2 Kafka Serializers and Deserializers that provide integration with Confluent Schema Registry for JSON messages with JSON Schemas.

Note: This library only works with Scala 3.3.x and above. For Scala 2.x, see here.

This functionality is backed by the following libraries:

Usage

Add the following to your build.sbt

resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven")
libraryDependencies += "io.kaizen-solutions" %% "fs2-kafka-jsonschema" % "<latest-version>"

Example

Define the datatype that you would like to send/receive over Kafka via the JSON + JSON Schema format. You do this by defining your datatype and providing a Pickler instance for it. The Pickler instance comes from the Tapir library.

import sttp.tapir.Schema.annotations.*
import sttp.tapir.json.pickler.*

final case class Book(
  @description("name of the book") name: String,
  @description("international standard book number") isbn: Int
)
object Book:
  given Pickler[Book] = Pickler.derived

Next, you can create a fs2 Kafka Serializer and Deserializer for this datatype and use it when building your FS2 Kafka producer/consumer.

import io.kaizensolutions.jsonschema.*
import cats.effect.*
import fs2.kafka.*

def bookSerializer[F[_]: Sync]: Resource[F, ValueSerializer[F, Book]] =
  JsonSchemaSerializerSettings.default
    .withSchemaRegistryUrl("http://localhost:8081")
    .forValue[F, Book]

def bookDeserializer[F[_]: Sync]: Resource[F, ValueDeserializer[F, Book]] =
  JsonSchemaDeserializerSettings.default
    .withSchemaRegistryUrl("http://localhost:8081")
    .forValue[F, Book]