Skip to content

Commit c4691dc

Browse files
Fix missed notifications in KafkaSensor (#410)
* Add unittest for KafkaSensor * Generalize test case * Fix current implementation * Implement group id per sensor and consume from last available offset * Few refactorings in KafkaSensor * Only consume from latest when sensor has been activated, while scheduler was already running * Fix configs
1 parent f4bd280 commit c4691dc

File tree

11 files changed

+325
-101
lines changed

11 files changed

+325
-101
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<slick-hikaricp.version>3.3.1</slick-hikaricp.version>
8989
<postgresql>42.2.6</postgresql>
9090
<kafka.version>2.2.0</kafka.version>
91+
<embedded.kafka.version>2.2.0</embedded.kafka.version>
9192
<play-json.version>2.7.3</play-json.version>
9293
<play-ws-standalone.version>2.0.3</play-ws-standalone.version>
9394
<play-ahc-ws-standalone.version>2.0.2</play-ahc-ws-standalone.version>
@@ -259,6 +260,12 @@
259260
<version>${mockito.version}</version>
260261
<scope>test</scope>
261262
</dependency>
263+
<dependency>
264+
<groupId>io.github.embeddedkafka</groupId>
265+
<artifactId>embedded-kafka_2.11</artifactId>
266+
<version>${embedded.kafka.version}</version>
267+
<scope>test</scope>
268+
</dependency>
262269
</dependencies>
263270
<build>
264271
<resources>

src/main/resources/application.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ scheduler.sensors.changedSensorsChunkQuerySize=100
5353
scheduler.executors.executablesFolder=/
5454

5555
#Kafka sensor properties.
56-
kafkaSource.group.id=hyper_drive_${appUniqueId}
56+
kafkaSource.group.id.prefix=hyper_drive
5757
kafkaSource.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
5858
kafkaSource.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
5959
kafkaSource.poll.duration=500
60-
kafkaSource.max.poll.records=3
60+
kafkaSource.max.poll.records=100
6161
kafkaSource.properties.enable.auto.commit=false
6262
kafkaSource.properties.auto.offset.reset=latest
6363
kafkaSource.properties.security.protocol=

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ class JobScheduler @Inject()(sensors: Sensors, executors: Executors, dagInstance
5555
def startManager(): Unit = {
5656
logger.info("Starting Manager")
5757
if (!isManagerRunningAtomic.get() && runningScheduler.isCompleted) {
58-
sensors.prepareSensors()
5958
isManagerRunningAtomic.set(true)
59+
sensors.prepareSensors()
60+
var firstIteration = true
6061
runningScheduler =
6162
Future {
6263
while (isManagerRunningAtomic.get()) {
6364
logger.debug("Running manager heart beat.")
64-
assignWorkflows()
65+
assignWorkflows(firstIteration)
66+
firstIteration = false
6567
Thread.sleep(HEART_BEAT)
6668
}
6769
}
@@ -86,7 +88,7 @@ class JobScheduler @Inject()(sensors: Sensors, executors: Executors, dagInstance
8688
!runningScheduler.isCompleted
8789
}
8890

89-
private def assignWorkflows(): Unit = {
91+
private def assignWorkflows(firstIteration: Boolean): Unit = {
9092
if (runningAssignWorkflows.isCompleted) {
9193
runningAssignWorkflows = workflowBalancer.getAssignedWorkflows(runningDags.keys.map(_.workflowId).toSeq)
9294
.recover {
@@ -98,7 +100,7 @@ class JobScheduler @Inject()(sensors: Sensors, executors: Executors, dagInstance
98100
.map(_.map(_.id))
99101
.map { assignedWorkflowIds =>
100102
removeFinishedDags()
101-
processEvents(assignedWorkflowIds)
103+
processEvents(assignedWorkflowIds, firstIteration)
102104
enqueueDags(assignedWorkflowIds)
103105
}
104106
}
@@ -122,9 +124,9 @@ class JobScheduler @Inject()(sensors: Sensors, executors: Executors, dagInstance
122124
}
123125
}
124126

125-
private def processEvents(assignedWorkflowIds: Seq[Long]): Unit = {
127+
private def processEvents(assignedWorkflowIds: Seq[Long], firstIteration: Boolean): Unit = {
126128
if (runningSensors.isCompleted) {
127-
runningSensors = sensors.processEvents(assignedWorkflowIds)
129+
runningSensors = sensors.processEvents(assignedWorkflowIds, firstIteration)
128130
runningSensors.onComplete {
129131
case Success(_) =>
130132
logger.debug("Running sensors finished successfully.")

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensors.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,8 @@
1515

1616
package za.co.absa.hyperdrive.trigger.scheduler.sensors
1717

18-
import java.util.concurrent.Executors
19-
20-
import javax.inject.Inject
2118
import org.slf4j.LoggerFactory
2219
import org.springframework.stereotype.Component
23-
import za.co.absa.hyperdrive.trigger.models.Workflow
2420
import za.co.absa.hyperdrive.trigger.models.enums.SensorTypes
2521
import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, SensorRepository}
2622
import za.co.absa.hyperdrive.trigger.scheduler.eventProcessor.EventProcessor
@@ -29,6 +25,8 @@ import za.co.absa.hyperdrive.trigger.scheduler.sensors.recurring.RecurringSensor
2925
import za.co.absa.hyperdrive.trigger.scheduler.sensors.time.{TimeSensor, TimeSensorQuartzSchedulerManager}
3026
import za.co.absa.hyperdrive.trigger.scheduler.utilities.SensorsConfig
3127

28+
import java.util.concurrent.Executors
29+
import javax.inject.Inject
3230
import scala.collection.mutable
3331
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
3432
import scala.util.{Failure, Success, Try}
@@ -42,13 +40,13 @@ class Sensors @Inject()(eventProcessor: EventProcessor, sensorRepository: Sensor
4240

4341
private val sensors: mutable.Map[Long, Sensor] = mutable.Map.empty[Long, Sensor]
4442

45-
def processEvents(assignedWorkflowIds: Seq[Long]): Future[Unit] = {
43+
def processEvents(assignedWorkflowIds: Seq[Long], firstIteration: Boolean): Future[Unit] = {
4644
logger.debug(s"Processing events. Sensors: ${sensors.keys}")
4745
removeReleasedSensors(assignedWorkflowIds)
4846
val fut = for {
4947
_ <- removeInactiveSensors()
5048
_ <- updateChangedSensors()
51-
_ <- addNewSensors(assignedWorkflowIds)
49+
_ <- addNewSensors(assignedWorkflowIds, firstIteration)
5250
_ <- pollEvents()
5351
} yield {
5452
(): Unit
@@ -65,21 +63,25 @@ class Sensors @Inject()(eventProcessor: EventProcessor, sensorRepository: Sensor
6563
}
6664

6765
def prepareSensors(): Unit = {
66+
logger.info("Preparing sensors")
6867
TimeSensorQuartzSchedulerManager.start()
6968
}
7069

7170
def cleanUpSensors(): Unit = {
71+
logger.info("Cleaning up sensors")
7272
sensors.values.foreach(_.close())
7373
sensors.clear()
7474

7575
TimeSensorQuartzSchedulerManager.stop()
7676
}
7777

7878
private def updateChangedSensors(): Future[Unit] = {
79+
val kafkaSensorConsumeFromLatest = false // by construction, this query never returns sensor that changed its
80+
// activation state, therefore the consumer never has to consume from the latest
7981
sensorRepository.getChangedSensors(sensors.values.map(_.sensorDefinition).toSeq).map(
8082
_.foreach { sensor =>
8183
stopSensor(sensor.id)
82-
startSensor(sensor)
84+
startSensor(sensor, kafkaSensorConsumeFromLatest)
8385
}
8486
)
8587
}
@@ -102,17 +104,18 @@ class Sensors @Inject()(eventProcessor: EventProcessor, sensorRepository: Sensor
102104
sensors.remove(id)
103105
}
104106

105-
private def addNewSensors(assignedWorkflowIds: Seq[Long]): Future[Unit] = {
107+
private def addNewSensors(assignedWorkflowIds: Seq[Long], firstIteration: Boolean): Future[Unit] = {
106108
val activeSensors = sensors.keys.toSeq
107109
sensorRepository.getNewActiveAssignedSensors(activeSensors, assignedWorkflowIds).map {
108-
_.foreach(sensor => startSensor(sensor))
110+
_.foreach(sensor => startSensor(sensor, kafkaSensorConsumeFromLatest = !firstIteration))
109111
}
110112
}
111113

112-
private def startSensor(sensor: za.co.absa.hyperdrive.trigger.models.Sensor) = sensor match {
114+
private def startSensor(sensor: za.co.absa.hyperdrive.trigger.models.Sensor, kafkaSensorConsumeFromLatest: Boolean) = sensor match {
113115
case sensor if sensor.sensorType == SensorTypes.Kafka || sensor.sensorType == SensorTypes.AbsaKafka =>
114116

115-
Try(new KafkaSensor(eventProcessor.eventProcessor(s"Sensor - ${sensor.sensorType.name}"), sensor, executionContext)) match {
117+
Try(new KafkaSensor(eventProcessor.eventProcessor(s"Sensor - ${sensor.sensorType.name}"), sensor,
118+
kafkaSensorConsumeFromLatest, executionContext)) match {
116119
case Success(s) => sensors.put(sensor.id, s)
117120
case Failure(f) => logger.error(s"Couldn't create Kafka sensor for sensor (#${sensor.id}).", f)
118121
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/KafkaSensor.scala

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,24 @@
1515

1616
package za.co.absa.hyperdrive.trigger.scheduler.sensors.kafka
1717

18-
import java.time.Duration
19-
import java.util.Collections
20-
21-
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
18+
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer}
19+
import org.apache.kafka.common.TopicPartition
20+
import org.slf4j.LoggerFactory
2221
import play.api.libs.json.{JsError, JsSuccess, Json}
2322
import za.co.absa.hyperdrive.trigger.models.{Event, Properties, Sensor}
2423
import za.co.absa.hyperdrive.trigger.scheduler.sensors.PollSensor
2524
import za.co.absa.hyperdrive.trigger.scheduler.utilities.KafkaConfig
2625

26+
import java.time.Duration
27+
import java.util
28+
import java.util.Collections
2729
import scala.concurrent.{ExecutionContext, Future}
28-
import za.co.absa.hyperdrive.trigger.scheduler.utilities.KafkaRichConsumer._
29-
import org.slf4j.LoggerFactory
30-
3130
import scala.util.{Failure, Success, Try}
3231

3332
class KafkaSensor(
3433
eventsProcessor: (Seq[Event], Properties) => Future[Boolean],
3534
sensorDefinition: Sensor,
35+
consumeFromLatest: Boolean = false,
3636
executionContext: ExecutionContext
3737
) extends PollSensor(eventsProcessor, sensorDefinition, executionContext) {
3838

@@ -41,18 +41,35 @@ class KafkaSensor(
4141
private val logMsgPrefix = s"Sensor id = ${properties.sensorId}."
4242
private val kafkaSettings = KafkaSettings(properties.settings)
4343

44-
private val consumer = new KafkaConsumer[String, String](KafkaConfig.getConsumerProperties(kafkaSettings))
44+
private val consumer = {
45+
val consumerProperties = KafkaConfig.getConsumerProperties(kafkaSettings)
46+
val groupId = s"${KafkaConfig.getBaseGroupId}-${properties.sensorId}"
47+
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
48+
49+
new KafkaConsumer[String, String](consumerProperties)
50+
}
4551

4652
try {
47-
consumer.subscribe(Collections.singletonList(kafkaSettings.topic))
53+
consumer.subscribe(Collections.singletonList(kafkaSettings.topic), new ConsumerRebalanceListener {
54+
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
55+
// no-op
56+
}
57+
58+
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
59+
if (consumeFromLatest) {
60+
consumer.seekToEnd(partitions)
61+
}
62+
}
63+
})
4864
} catch {
4965
case e: Exception => logger.debug(s"$logMsgPrefix. Exception during subscribe.", e)
5066
}
5167

5268
override def poll(): Future[Unit] = {
69+
import scala.collection.JavaConverters._
5370
logger.debug(s"$logMsgPrefix. Polling new events.")
5471
val fut = Future {
55-
consumer.pollAsScala(Duration.ofMillis(KafkaConfig.getPollDuration))
72+
consumer.poll(Duration.ofMillis(KafkaConfig.getPollDuration)).asScala
5673
} flatMap processRecords map (_ => consumer.commitSync())
5774

5875
fut.onComplete {
@@ -77,9 +94,12 @@ class KafkaSensor(
7794
}
7895
}
7996
}
80-
eventsProcessor.apply(matchedEvents, properties).map(_ => (): Unit)
81-
} else
97+
matchedEvents.headOption.map(matchedEvent => {
98+
eventsProcessor.apply(Seq(matchedEvent), properties).map(_ => (): Unit)
99+
}).getOrElse(Future.successful((): Unit))
100+
} else {
82101
Future.successful((): Unit)
102+
}
83103
}
84104

85105
private def recordToEvent[A](record: ConsumerRecord[A, String]): Event = {

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/utilities/Configs.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515

1616
package za.co.absa.hyperdrive.trigger.scheduler.utilities
1717

18-
import java.io.File
19-
import java.util.UUID.randomUUID
20-
import java.util.Properties
21-
2218
import com.typesafe.config.{Config, ConfigFactory}
19+
import org.apache.kafka.clients.consumer.ConsumerConfig
2320
import za.co.absa.hyperdrive.trigger.scheduler.sensors.kafka.KafkaSettings
2421

22+
import java.io.File
23+
import java.util.Properties
2524
import scala.collection.JavaConverters._
2625
import scala.util.Try
2726

@@ -47,13 +46,15 @@ private object Configs {
4746
}
4847

4948
object KafkaConfig {
49+
private val keyDeserializer = Configs.conf.getString("kafkaSource.key.deserializer")
50+
private val valueDeserializer = Configs.conf.getString("kafkaSource.value.deserializer")
51+
private val maxPollRecords = Configs.conf.getString("kafkaSource.max.poll.records")
5052
def getConsumerProperties(kafkaSettings: KafkaSettings): Properties = {
5153
val properties = new Properties()
52-
properties.put("bootstrap.servers", kafkaSettings.servers.mkString(","))
53-
properties.put("group.id", randomUUID().toString)
54-
properties.put("key.deserializer", Configs.conf.getString("kafkaSource.key.deserializer"))
55-
properties.put("value.deserializer", Configs.conf.getString("kafkaSource.value.deserializer"))
56-
properties.put("max.poll.records", Configs.conf.getString("kafkaSource.max.poll.records"))
54+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettings.servers.mkString(","))
55+
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
56+
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
57+
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
5758

5859
Configs.getMapFromConf("kafkaSource.properties").foreach { case (key, value) =>
5960
properties.put(key, value)
@@ -62,6 +63,8 @@ object KafkaConfig {
6263
properties
6364
}
6465

66+
val getBaseGroupId: String =
67+
s"${Configs.conf.getString("kafkaSource.group.id.prefix")}_${Configs.conf.getString("appUniqueId")}"
6568
val getPollDuration: Long =
6669
Configs.conf.getLong("kafkaSource.poll.duration")
6770
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/utilities/KafkaRichConsumer.scala

Lines changed: 0 additions & 47 deletions
This file was deleted.

src/test/resources/application.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ appUniqueId=9c282190-4078-4380-8960-ce52f43b94fg
2323
scheduler.executors.executablesFolder=src/test/resources/
2424
scheduler.sensors.changedSensorsChunkQuerySize=100
2525

26+
kafkaSource.group.id.prefix=hyper_drive
27+
kafkaSource.properties.security.protocol=PLAINTEXT
28+
2629
db.connectionPool=disabled
2730
db.url=jdbc:h2:mem:hyperdriver;DATABASE_TO_UPPER=false
2831
db.driver=org.h2.Driver

0 commit comments

Comments
 (0)