Skip to content

Commit 8d1abd6

Browse files
Don't skip job instance if topic doesn't exist (#713)
1 parent 7c2d7d7 commit 8d1abd6

File tree

2 files changed

+4
-7
lines changed

2 files changed

+4
-7
lines changed

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,8 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig,
8282
val isNewJobInstanceRequiredFut = kafkaEndOffsetsOptFut.flatMap { kafkaEndOffsetsOpt =>
8383
kafkaBeginningOffsetsOptFut.flatMap { kafkaBeginningOffsetsOpt =>
8484
(kafkaBeginningOffsetsOpt, kafkaEndOffsetsOpt) match {
85-
case (Some(kafkaBeginningOffsets), Some(kafkaEndOffsets)) =>
86-
if (kafkaBeginningOffsets.isEmpty) {
87-
logger.info(s"Topic ${kafkaParametersOpt.get._1} does not exist. Skipping job instance")
88-
Future { false }
89-
} else if (offsetsEqual(kafkaBeginningOffsets, kafkaEndOffsets)) {
85+
case (Some(kafkaBeginningOffsets), Some(kafkaEndOffsets)) if kafkaBeginningOffsets.nonEmpty =>
86+
if (offsetsEqual(kafkaBeginningOffsets, kafkaEndOffsets)) {
9087
logger.info(s"Topic ${kafkaParametersOpt.get._1} is empty. Skipping job instance")
9188
Future { false }
9289
} else {

src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonServiceTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers
126126
}
127127
}
128128

129-
it should "return false if the kafka topic does not exist" in {
129+
it should "return true if the kafka topic does not exist" in {
130130
val config = getSparkConfig
131131
val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
132132
val jobParameters = getJobParameters
@@ -138,7 +138,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers
138138

139139
resultFut.map { result =>
140140
verify(checkpointService, never()).getLatestOffsetFilePath(any())(any())
141-
result shouldBe false
141+
result shouldBe true
142142
}
143143
}
144144

0 commit comments

Comments
 (0)