Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Scalish #66

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sudo: false
jdk: oraclejdk8
scala:
- 2.11.11
- 2.12.4
- 2.12.5
sbt_args: -mem 2000
script:
- sbt "++ ${TRAVIS_SCALA_VERSION}!" test
Expand Down
2 changes: 1 addition & 1 deletion project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ object Versions {
val CuratorVersion = "4.0.0"
val MinitestVersion = "2.0.0"
val JDKVersion = "1.8"
val Scala_2_12_Version = "2.12.4"
val Scala_2_12_Version = "2.12.5"
val Scala_2_11_Version = "2.11.11"
val Avro4sVersion = "1.8.3"
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version )
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.1.0
sbt.version=1.1.2
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.lightbend.kafka.scala.streams

import java.util.Properties

import org.apache.kafka.common.{Metric, MetricName}
import org.apache.kafka.streams.processor.{StateRestoreListener, StreamPartitioner, ThreadMetadata}
import org.apache.kafka.streams.state.{QueryableStoreType, StreamsMetadata}
import org.apache.kafka.streams.{KafkaClientSupplier, KafkaStreams, StreamsConfig, Topology}

import scala.collection.JavaConverters._

class KafkaStreamsS(inner: KafkaStreams) {

def allMetadata(): Iterable[StreamsMetadata] = {
inner.allMetadata().asScala
}

def allMetadataForStore(storeName: String): Iterable[StreamsMetadata] = {
inner.allMetadataForStore(storeName).asScala
}

def cleanUp() = {
inner.cleanUp()
this
}

def close() = {
inner.close()
}

def close(timeout: Long, timeUnit: java.util.concurrent.TimeUnit) = {
inner.close(timeout, timeUnit)
}

def localThreadsMetadata(): Set[ThreadMetadata] = {
inner.localThreadsMetadata.asScala.toSet
}

def metadataForKey[K](storeName: String, key: K, keySerializer: Serializer[K]): StreamsMetadata = {
inner.metadataForKey(storeName, key, keySerializer)
}

def metadataForKey[K](storeName: String, key: K, partitioner: StreamPartitioner[_ >: K, _]): StreamsMetadata = {
inner.metadataForKey(storeName, key, partitioner)
}

def metrics(): Map[MetricName, _ <: Metric] = {
inner.metrics().asScala.toMap
}

def withGlobalStateRestoreListener(globalStateRestoreListener: StateRestoreListener) = {
inner.setGlobalStateRestoreListener(globalStateRestoreListener)
this
}

def withStateListener(listener: KafkaStreams.StateListener) = {
inner.setStateListener(listener)
this
}

def withUncaughtExceptionHandler(eh: java.lang.Thread.UncaughtExceptionHandler) = {
inner.setUncaughtExceptionHandler(eh)
this
}

def start(): KafkaStreamsS = {
inner.start()
this
}

def state(): KafkaStreams.State = {
inner.state()
}

def store[T](storeName: String, queryableStoreType: QueryableStoreType[T]) = {
inner.store(storeName, queryableStoreType)
}
}

object KafkaStreamsS {
def apply(s: StreamsBuilderS, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(s.build(), p))

def apply(topology: Topology, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(topology, p))

def apply(topology: Topology, config: StreamsConfig) = new KafkaStreamsS(new KafkaStreams(topology, config))

def apply(topology: Topology, config: StreamsConfig, clientSupplier: KafkaClientSupplier) = new KafkaStreamsS(new KafkaStreams(topology, config, clientSupplier))

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ class KafkaLocalServer private (kafkaProperties: Properties, zooKeeperServer: Zo
private var zkUtils : ZkUtils =
ZkUtils.apply(s"localhost:${zooKeeperServer.getPort()}", DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, false)

def start(): Unit = {
def start(): KafkaLocalServer = {

broker = KafkaServerStartable.fromProps(kafkaProperties)
broker.startup()
this
}

//scalastyle:off null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, Mess
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.{ KeyValue, StreamsConfig}
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()
}

override def tearDown(server: KafkaLocalServer): Unit = {
Expand Down Expand Up @@ -57,8 +55,8 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa

wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()
val streams = KafkaStreamsS(builder, streamsConfiguration).start()


//
// Step 2: Produce some input data to the input topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.{ KeyValue, StreamsConfig}
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

Expand Down Expand Up @@ -70,9 +70,7 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
with ProbabilisticCountingScalaIntegrationTestData {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()
}

override def tearDown(server: KafkaLocalServer): Unit = {
Expand Down Expand Up @@ -151,8 +149,7 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
.transform(() => new ProbabilisticCounter, cmsStoreName)
.to(outputTopic)

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()
val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration).start()

// Step 2: Publish some input text lines.
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.typesafe.scalalogging.LazyLogging
import minitest.TestSuite
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, PunctuationType}
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}
import org.apache.kafka.streams.{ StreamsConfig, Topology}

/**
* This sample is using usage of punctuate, which is significantly changed in version 1.0 and
Expand All @@ -23,9 +23,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}
object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()
}

override def tearDown(server: KafkaLocalServer): Unit = {
Expand Down Expand Up @@ -53,8 +51,7 @@ object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData
topology.addSource("data", inputTopic)
// Processors
topology.addProcessor("data processor", () => new SampleProcessor(5000), "data")
val streams = new KafkaStreams(topology, streamsConfiguration)
streams.start()
val streams = KafkaStreamsS(topology, streamsConfiguration).start()
// Allpw time for the streams to start up
Thread.sleep(5000L)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes extends TestSuite[Kaf
with StreamToTableJoinTestData with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()
}

override def tearDown(server: KafkaLocalServer): Unit = {
Expand Down Expand Up @@ -106,21 +104,19 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes extends TestSuite[Kaf
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopic)

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration)

streams.setUncaughtExceptionHandler((_: Thread, e: Throwable) => try {
streams.withUncaughtExceptionHandler((_: Thread, e: Throwable) => try {
logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e)
e.printStackTrace()
val closed: Unit = streams.close()
val closed = streams.close()
logger.info(s"Exiting application after streams close ($closed)")
} catch {
case x: Exception => x.printStackTrace()
} finally {
logger.debug("Exiting application ..")
System.exit(-1)
})

streams.start()
}).start()

//
// Step 2: Publish user-region information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro
extends TestSuite[KafkaLocalServer] with StreamToTableJoinTestData {
extends TestSuite[KafkaLocalServer] with StreamToTableJoinTestData with LazyLogging{

case class UserClicks(clicks: Long)

Expand Down Expand Up @@ -82,9 +83,7 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro


override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(true, Some(localStateDir))
s.start()
s
KafkaLocalServer(true, Some(localStateDir)).start()
}

override def tearDown(server: KafkaLocalServer): Unit = {
Expand Down Expand Up @@ -141,28 +140,21 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopic)

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)

streams
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
val streams: KafkaStreamsS = KafkaStreamsS(builder.build(), streamsConfiguration)
streams.withUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = try {
println(
Copy link
Contributor

@joan38 joan38 May 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Way better 👍

logger.error(
s"Stream terminated because of uncaught exception .. Shutting " +
s"down app",

e)
e.printStackTrace
s"down app", e)
val closed = streams.close()
println(s"Exiting application after streams close ($closed)")
logger.debug(s"Exiting application after streams close ($closed)")
} catch {
case x: Exception => x.printStackTrace
} finally {
println("Exiting application ..")
logger.debug("Exiting application ..")
System.exit(-1)
}
})

streams.start()
}).start()

//
// Step 2: Publish user-region information.
Expand Down