Skip to content

Commit

Permalink
Merge pull request #2 from kaizen-solutions/feature/readme
Browse files Browse the repository at this point in the history
Improvements
  • Loading branch information
calvinlfer authored Jul 16, 2024
2 parents 793cd17 + 2e9a6d9 commit cd75c5f
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 9 deletions.
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,45 @@ This functionality is backed by the following libraries:
- [FS2 kafka](https://github.com/fd4s/fs2-kafka)
- [Confluent Schema Registry](https://github.com/confluentinc/schema-registry)

### Usage ###
### Usage

Add the following to your `build.sbt`
```sbt
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven")
libraryDependencies += "io.kaizen-solutions" %% "fs2-kafka-jsonschema" % "<latest-version>"
```

### Example

Define the datatype that you would like to send/receive over Kafka via the JSON + JSON Schema format. You do this by defining your datatype and providing a `Pickler` instance for it.
The `Pickler` instance comes from the Tapir library.

```scala
import sttp.tapir.Schema.annotations.*
import sttp.tapir.json.pickler.*

final case class Book(
@description("name of the book") name: String,
@description("international standard book number") isbn: Int
)
object Book:
given Pickler[Book] = Pickler.derived
```

Next, you can create a fs2 Kafka `Serializer` and `Deserializer` for this datatype and use it when building your FS2 Kafka producer/consumer.

```scala
import io.kaizensolutions.jsonschema.*
import cats.effect.*
import fs2.kafka.*

def bookSerializer[F[_]: Sync]: Resource[F, ValueSerializer[F, Book]] =
JsonSchemaSerializerSettings.default
.withSchemaRegistryUrl("http://localhost:8081")
.forValue[F, Book]

def bookDeserializer[F[_]: Sync]: Resource[F, ValueDeserializer[F, Book]] =
JsonSchemaDeserializerSettings.default
.withSchemaRegistryUrl("http://localhost:8081")
.forValue[F, Book]
```
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ inThisBuild {
ScalacOptions.unchecked,
ScalacOptions.deprecation,
ScalacOptions.warnValueDiscard,
ScalacOptions.warnUnusedImports,
ScalacOptions.warnDeadCode,
ScalacOptions.warnUnusedImplicits,
ScalacOptions.warnUnusedExplicits,
ScalacOptions.release("17"),
ScalacOptions.privateKindProjector
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.json.{JsonSchema, JsonSchemaUtils}
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
import sttp.apispec.circe.*
import sttp.apispec.{ExampleSingleValue, SchemaType}
import sttp.tapir.docs.apispec.schema.*
import sttp.tapir.json.pickler.Pickler

import scala.jdk.CollectionConverters.*

private[jsonschema] object JsonSchemaSerializer:
def create[F[_], A](
isKey: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.kaizensolutions.jsonschema
import cats.effect.{Resource, Sync}
import com.fasterxml.jackson.databind.ObjectMapper
import fs2.kafka.*
import io.circe.generic.auto
import io.confluent.kafka.schemaregistry.SchemaProvider
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.json.{JsonSchemaProvider, SpecificationVersion}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@ import cats.effect.*
import cats.syntax.all.*
import fs2.Stream
import fs2.kafka.*
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider
import io.confluent.kafka.schemaregistry.{CompatibilityLevel, SchemaProvider}
import io.github.embeddedkafka.schemaregistry.*
import org.apache.kafka.common.errors.{InvalidConfigurationException, SerializationException as UnderlyingSerializationException}
import sttp.tapir.Schema.annotations.*
import sttp.tapir.json.pickler.Pickler
import weaver.*

import java.io.File
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*

object JsonSchemaSerdesSpec extends IOSuite:
Expand Down

0 comments on commit cd75c5f

Please sign in to comment.