Skip to content

Commit 553b052

Browse files
authored
Feature/789 show messages to ingest on hyperdrive jobs (#791)
#789 show messages to ingest on hyperdrive jobs
1 parent 8a4cef9 commit 553b052

25 files changed

+1014
-42
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.controllers
17+
18+
import org.springframework.web.bind.annotation._
19+
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveService
20+
import za.co.absa.hyperdrive.trigger.models._
21+
22+
import java.util.concurrent.CompletableFuture
23+
import javax.inject.Inject
24+
import scala.compat.java8.FutureConverters._
25+
import scala.concurrent.ExecutionContext.Implicits.global
26+
27+
@RestController
28+
class HyperdriveController @Inject() (hyperdriveService: HyperdriveService) {
29+
@GetMapping(path = Array("/hyperdrive/workflows/{id}/ingestionStatus"))
30+
def getIngestionStatus(@PathVariable id: Long): CompletableFuture[Seq[IngestionStatus]] =
31+
hyperdriveService.getIngestionStatus(id).toJava.toCompletableFuture
32+
}

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.springframework.stereotype.Service
2525
import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap
2626

2727
import javax.inject.Inject
28-
import scala.util.Try
28+
import scala.util.{Success, Try}
2929

3030
trait CheckpointService {
3131
type TopicPartitionOffsets = Map[String, Map[Int, Long]]
@@ -34,6 +34,10 @@ trait CheckpointService {
3434
def getLatestOffsetFilePath(params: HdfsParameters)(
3535
implicit ugi: UserGroupInformation
3636
): Try[Option[(String, Boolean)]]
37+
38+
def getLatestCommittedOffset(params: HdfsParameters)(
39+
implicit ugi: UserGroupInformation
40+
): Try[Option[TopicPartitionOffsets]]
3741
}
3842

3943
class HdfsParameters(
@@ -98,6 +102,16 @@ class CheckpointServiceImpl @Inject() (@Lazy hdfsService: HdfsService) extends C
98102
}
99103
}
100104

105+
override def getLatestCommittedOffset(
106+
params: HdfsParameters
107+
)(implicit ugi: UserGroupInformation): Try[Option[TopicPartitionOffsets]] = {
108+
getLatestCommitBatchId(params.checkpointLocation).flatMap {
109+
_.map { latestCommit =>
110+
getOffsetsFromFile(new Path(s"${params.checkpointLocation}/$offsetsDirName/$latestCommit").toString)
111+
}.getOrElse(Success(None))
112+
}
113+
}
114+
101115
/**
102116
* see org.apache.spark.sql.execution.streaming.OffsetSeqLog
103117
* and org.apache.spark.sql.kafka010.JsonUtils
Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,28 @@ import org.springframework.context.annotation.Lazy
2626
import org.springframework.stereotype.Service
2727
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
2828
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
29-
import za.co.absa.hyperdrive.trigger.models.{JobInstanceParameters, SparkInstanceParameters}
29+
import za.co.absa.hyperdrive.trigger.models.{BeginningEndOffsets, JobInstanceParameters, SparkInstanceParameters}
3030

3131
import java.util.Properties
3232
import javax.inject.Inject
3333
import scala.concurrent.{ExecutionContext, Future}
34+
import scala.util.{Failure, Success}
3435

35-
trait HyperdriveOffsetComparisonService {
36+
trait HyperdriveOffsetService {
3637
def isNewJobInstanceRequired(jobParameters: JobInstanceParameters)(implicit ec: ExecutionContext): Future[Boolean]
38+
39+
def getNumberOfMessagesLeft(jobParameters: JobInstanceParameters)(
40+
implicit ec: ExecutionContext
41+
): Future[Option[(String, Map[Int, Long])]]
3742
}
3843

3944
@Service
4045
@Lazy
41-
class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig,
42-
@Lazy checkpointService: CheckpointService,
43-
@Lazy userGroupInformationService: UserGroupInformationService,
44-
kafkaService: KafkaService
45-
) extends HyperdriveOffsetComparisonService {
46+
class HyperdriveOffsetServiceImpl @Inject() (sparkConfig: SparkConfig,
47+
@Lazy checkpointService: CheckpointService,
48+
@Lazy userGroupInformationService: UserGroupInformationService,
49+
kafkaService: KafkaService
50+
) extends HyperdriveOffsetService {
4651
private val logger = LoggerFactory.getLogger(this.getClass)
4752
private val HyperdriveCheckpointKey = "writer.common.checkpoint.location"
4853
private val HyperdriveKafkaTopicKey = "reader.kafka.topic"
@@ -52,6 +57,58 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig,
5257
private val ListDelimiter = ','
5358
private val defaultDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"
5459

60+
/**
61+
* @param jobParameters Parameters for the job instance. Should contain at least
62+
* - reader.kafka.topic
63+
* - reader.kafka.brokers
64+
* - writer.common.checkpoint.location
65+
* @param ec ExecutionContext
66+
* @return - number of not ingested messages for each topic and partition.
67+
*/
68+
def getNumberOfMessagesLeft(
69+
jobParameters: JobInstanceParameters
70+
)(implicit ec: ExecutionContext): Future[Option[(String, Map[Int, Long])]] = {
71+
val kafkaParametersOpt = getKafkaParameters(jobParameters)
72+
val hdfsParametersOpt: Option[HdfsParameters] = getResolvedAppArguments(jobParameters).flatMap(getHdfsParameters)
73+
74+
if (kafkaParametersOpt.isEmpty) {
75+
logger.debug(s"Kafka parameters were not found in job definition $jobParameters")
76+
}
77+
78+
Future(
79+
for {
80+
kafkaParameters <- kafkaParametersOpt
81+
hdfsParameters <- hdfsParametersOpt
82+
} yield {
83+
val kafkaOffsets = kafkaService.getBeginningEndOffsets(kafkaParameters._1, kafkaParameters._2)
84+
kafkaOffsets match {
85+
case BeginningEndOffsets(_, start, end) if start.nonEmpty && end.nonEmpty && start.keySet == end.keySet =>
86+
val ugi = userGroupInformationService.loginUserFromKeytab(hdfsParameters.principal, hdfsParameters.keytab)
87+
val hdfsOffsetsTry = checkpointService.getLatestCommittedOffset(hdfsParameters)(ugi).map(_.map(_.head._2))
88+
89+
hdfsOffsetsTry match {
90+
case Failure(_) => None
91+
case Success(hdfsOffsetsOption) =>
92+
val messagesLeft = kafkaOffsets.beginningOffsets.map { case (partition, kafkaBeginningOffset) =>
93+
val kafkaEndOffset = kafkaOffsets.endOffsets(partition)
94+
val numberOfMessages = hdfsOffsetsOption.flatMap(_.get(partition)) match {
95+
case Some(hdfsOffset) if hdfsOffset > kafkaEndOffset => kafkaEndOffset - hdfsOffset
96+
case Some(hdfsOffset) if hdfsOffset > kafkaBeginningOffset => kafkaEndOffset - hdfsOffset
97+
case Some(hdfsOffset) if hdfsOffset <= kafkaBeginningOffset => kafkaEndOffset - kafkaBeginningOffset
98+
case None => kafkaEndOffset - kafkaBeginningOffset
99+
}
100+
partition -> numberOfMessages
101+
}
102+
Some((kafkaOffsets.topic, messagesLeft))
103+
}
104+
case _ =>
105+
logger.warn(s"Inconsistent response from kafka for topic: ${kafkaOffsets.topic}")
106+
None
107+
}
108+
}
109+
).map(_.flatten)
110+
}
111+
55112
/**
56113
* @param jobParameters Parameters for the job instance. Should contain at least
57114
* - reader.kafka.topic
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.services
17+
18+
import org.slf4j.LoggerFactory
19+
import org.springframework.stereotype.Service
20+
import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus}
21+
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
22+
import za.co.absa.hyperdrive.trigger.persistance.WorkflowRepository
23+
24+
import scala.concurrent.{ExecutionContext, Future}
25+
import scala.util.{Failure, Success}
26+
27+
trait HyperdriveService {
28+
protected val workflowRepository: WorkflowRepository
29+
protected val jobTemplateService: JobTemplateService
30+
protected val hyperdriveOffsetService: HyperdriveOffsetService
31+
32+
def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]]
33+
}
34+
35+
@Service
36+
class HyperdriveServiceImpl(
37+
override protected val workflowRepository: WorkflowRepository,
38+
override protected val jobTemplateService: JobTemplateService,
39+
override protected val hyperdriveOffsetService: HyperdriveOffsetService
40+
) extends HyperdriveService {
41+
private val logger = LoggerFactory.getLogger(this.getClass)
42+
43+
override def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]] = {
44+
workflowRepository.getWorkflow(id).flatMap { workflow =>
45+
jobTemplateService
46+
.resolveJobTemplate(workflow.dagDefinitionJoined)
47+
.flatMap(resolvedJobs =>
48+
Future.sequence(
49+
resolvedJobs.map {
50+
case resolvedJob if resolvedJob.jobParameters.jobType == JobTypes.Hyperdrive =>
51+
hyperdriveOffsetService.getNumberOfMessagesLeft(resolvedJob.jobParameters).transformWith {
52+
case Failure(exception) =>
53+
logger.error(s"Failed to get number of messages left to ingest for a workflow: $id", exception)
54+
Future.successful(
55+
IngestionStatus(
56+
jobName = resolvedJob.name,
57+
jobType = resolvedJob.jobParameters.jobType.name,
58+
topicStatus = None
59+
)
60+
)
61+
case Success(messagesLeftOpt) =>
62+
Future.successful(
63+
IngestionStatus(
64+
jobName = resolvedJob.name,
65+
jobType = resolvedJob.jobParameters.jobType.name,
66+
topicStatus = messagesLeftOpt.map(messagesLeft => TopicStatus(messagesLeft._1, messagesLeft._2))
67+
)
68+
)
69+
}
70+
case resolvedJob =>
71+
Future.successful(
72+
IngestionStatus(
73+
jobName = resolvedJob.name,
74+
jobType = resolvedJob.jobParameters.jobType.name,
75+
topicStatus = None
76+
)
77+
)
78+
}
79+
)
80+
)
81+
}
82+
}
83+
}

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.springframework.stereotype.Service
2222
import org.springframework.util.ConcurrentLruCache
2323
import za.co.absa.hyperdrive.trigger.api.rest.services.KafkaServiceImpl.{BeginningOffsets, EndOffsets, OffsetFunction}
2424
import za.co.absa.hyperdrive.trigger.configuration.application.GeneralConfig
25+
import za.co.absa.hyperdrive.trigger.models.BeginningEndOffsets
2526

2627
import java.util.Properties
2728
import java.util.UUID.randomUUID
@@ -31,6 +32,7 @@ import scala.collection.JavaConverters._
3132
trait KafkaService {
3233
def getBeginningOffsets(topic: String, consumerProperties: Properties): Map[Int, Long]
3334
def getEndOffsets(topic: String, consumerProperties: Properties): Map[Int, Long]
35+
def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets
3436
}
3537

3638
@Service
@@ -50,6 +52,14 @@ class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaSer
5052
getOffsets(topic, consumerProperties, EndOffsets)
5153
}
5254

55+
def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets = {
56+
BeginningEndOffsets(
57+
topic,
58+
getOffsets(topic, consumerProperties, BeginningOffsets),
59+
getOffsets(topic, consumerProperties, EndOffsets)
60+
)
61+
}
62+
5363
def createKafkaConsumer(propertiesThreadId: (Properties, Long)): KafkaConsumer[String, String] = {
5464
logger.info(
5565
s"Creating new Kafka Consumer for thread id ${propertiesThreadId._2} and" +
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.models
17+
18+
case class BeginningEndOffsets(
19+
topic: String,
20+
beginningOffsets: Map[Int, Long],
21+
endOffsets: Map[Int, Long]
22+
)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.models
17+
18+
case class IngestionStatus(
19+
jobName: String,
20+
jobType: String,
21+
topicStatus: Option[TopicStatus]
22+
)
23+
24+
case class TopicStatus(topic: String, messagesToIngest: Map[Int, Long])

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.springframework.beans.factory.BeanFactory
3434
import org.springframework.context.annotation.Lazy
3535
import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor
3636
import org.springframework.stereotype.Component
37-
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService
37+
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService
3838
import za.co.absa.hyperdrive.trigger.configuration.application.{SchedulerConfig, SparkConfig}
3939
import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender
4040

@@ -49,7 +49,7 @@ class Executors @Inject() (
4949
beanFactory: BeanFactory,
5050
implicit val sparkConfig: SparkConfig,
5151
schedulerConfig: SchedulerConfig,
52-
@Lazy hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService
52+
@Lazy hyperdriveOffsetComparisonService: HyperdriveOffsetService
5353
) {
5454
private val logger = LoggerFactory.getLogger(this.getClass)
5555
private implicit val executionContext: ExecutionContextExecutor =

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package za.co.absa.hyperdrive.trigger.scheduler.executors.spark
1717

1818
import org.slf4j.LoggerFactory
19-
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService
19+
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService
2020
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
2121
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses
2222
import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters}
@@ -31,23 +31,23 @@ object HyperdriveExecutor {
3131
jobParameters: SparkInstanceParameters,
3232
updateJob: JobInstance => Future[Unit],
3333
sparkClusterService: SparkClusterService,
34-
offsetComparisonService: HyperdriveOffsetComparisonService
34+
offsetService: HyperdriveOffsetService
3535
)(implicit executionContext: ExecutionContext, sparkConfig: SparkConfig): Future[Unit] =
3636
jobInstance.executorJobId match {
37-
case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob)
37+
case None => submitJob(sparkClusterService, offsetService, jobInstance, jobParameters, updateJob)
3838
case Some(executorJobId) =>
3939
SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService)
4040
}
4141

4242
private def submitJob(sparkClusterService: SparkClusterService,
43-
offsetComparisonService: HyperdriveOffsetComparisonService,
43+
offsetService: HyperdriveOffsetService,
4444
jobInstance: JobInstance,
4545
jobParameters: SparkInstanceParameters,
4646
updateJob: JobInstance => Future[Unit]
4747
)(implicit executionContext: ExecutionContext) = {
4848
logger.debug("Using HyperdriveExecutor")
4949
for {
50-
newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters)
50+
newJobRequired <- offsetService.isNewJobInstanceRequired(jobParameters)
5151
_ <-
5252
if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob)
5353
else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData))

0 commit comments

Comments
 (0)