Skip to content

Commit 8a4cef9

Browse files
jozefbakusTomas Dlabka
andauthored
Feature/783 verify kafka topic access (#786)
#783 verify kafka topic access UI --------- Co-authored-by: Tomas Dlabka <[email protected]>
1 parent b24ed4a commit 8a4cef9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2187
-10
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
<log4j.version>2.17.1</log4j.version>
8686
<slf4j.log4j2.version>1.7.26</slf4j.log4j2.version>
8787
<spring.security.ldap.version>5.6.1</spring.security.ldap.version>
88+
<spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version>
8889
<scalatest.version>3.0.5</scalatest.version>
8990
<h2database.version>2.1.210</h2database.version>
9091
<testcontainers.postgresql.version>1.15.3</testcontainers.postgresql.version>
@@ -341,6 +342,11 @@
341342
<groupId>com.amazonaws</groupId>
342343
<artifactId>aws-java-sdk-sts</artifactId>
343344
</dependency>
345+
<dependency>
346+
<groupId>org.springframework.security.kerberos</groupId>
347+
<artifactId>spring-security-kerberos-client</artifactId>
348+
<version>${spring.kerberos.version}</version>
349+
</dependency>
344350

345351
<!-- For Liquibase yaml -->
346352
<dependency>

src/main/resources/application.properties

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,12 @@ db.numThreads=4
130130

131131
db.skip.liquibase=false
132132
spring.liquibase.change-log=classpath:/db_scripts/liquibase/db.changelog.yml
133+
134+
# Confluent client config
135+
confluent.baseUrls=url
136+
confluent.authPath=/confluent/auth
137+
confluent.user=user-name
138+
confluent.base64Credentials=encoded_credentials
139+
confluent.clusterType=cluster-type
140+
confluent.kafkaClusterId=cluster-id
141+
confluent.retries=1
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.client
17+
18+
trait ApiCaller {
19+
def call[T](fn: String => T): T
20+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.client
17+
18+
import org.slf4j.Logger
19+
import org.slf4j.LoggerFactory
20+
import org.springframework.http.{HttpEntity, HttpHeaders, HttpMethod, HttpStatus, ResponseEntity}
21+
import org.springframework.security.kerberos.client.KerberosRestTemplate
22+
import org.springframework.util.LinkedMultiValueMap
23+
import org.springframework.web.client.RestTemplate
24+
25+
import scala.collection.JavaConverters._
26+
import scala.util.Try
27+
28+
object AuthClient {
29+
30+
def apply(credentials: Credentials, apiCaller: ApiCaller, authEndpoint: String): AuthClient =
31+
credentials match {
32+
case standardCredentials: StandardCredentials =>
33+
createStandardAuthClient(apiCaller, standardCredentials, authEndpoint)
34+
case standardCredentialsBase64: StandardCredentialsBase64 =>
35+
createStandardBase64AuthClient(apiCaller, standardCredentialsBase64, authEndpoint)
36+
case kerberosCredentials: KerberosCredentials =>
37+
createSpnegoAuthClient(apiCaller, kerberosCredentials, authEndpoint)
38+
}
39+
40+
private def createStandardAuthClient(
41+
apiCaller: ApiCaller,
42+
credentials: StandardCredentials,
43+
path: String
44+
): StandardAuthClient = {
45+
val restTemplate = RestTemplateSingleton.instance
46+
new StandardAuthClient(credentials, restTemplate, apiCaller, path)
47+
}
48+
49+
private def createStandardBase64AuthClient(
50+
apiCaller: ApiCaller,
51+
credentials: StandardCredentialsBase64,
52+
path: String
53+
): StandardBase64AuthClient = {
54+
val restTemplate = RestTemplateSingleton.instance
55+
new StandardBase64AuthClient(credentials, restTemplate, apiCaller, path)
56+
}
57+
58+
private def createSpnegoAuthClient(
59+
apiCaller: ApiCaller,
60+
credentials: KerberosCredentials,
61+
path: String
62+
): SpnegoAuthClient = {
63+
val restTemplate = new KerberosRestTemplate(credentials.keytabLocation, credentials.username)
64+
restTemplate.setErrorHandler(NoOpErrorHandler)
65+
new SpnegoAuthClient(credentials, restTemplate, apiCaller, path)
66+
}
67+
}
68+
69+
sealed abstract class AuthClient(
70+
credentials: Credentials,
71+
restTemplate: RestTemplate,
72+
apiCaller: ApiCaller,
73+
url: String => String
74+
) {
75+
protected val logger: Logger = LoggerFactory.getLogger(this.getClass)
76+
77+
@throws[UnauthorizedException]
78+
def authenticate(): HttpHeaders =
79+
apiCaller.call { baseUrl =>
80+
val response = requestAuthentication(url(baseUrl))
81+
val statusCode = response.getStatusCode
82+
83+
statusCode match {
84+
case HttpStatus.OK =>
85+
logger.info(s"Authentication successful")
86+
getAuthHeaders(response)
87+
case _ =>
88+
throw UnauthorizedException(s"Authentication failure ($statusCode)")
89+
}
90+
}
91+
92+
protected def requestAuthentication(url: String): ResponseEntity[String]
93+
94+
private def getAuthHeaders(response: ResponseEntity[String]): HttpHeaders = {
95+
val headers = response.getHeaders
96+
val sessionCookie = headers.get("set-cookie").asScala.head
97+
val csrfToken = Try(headers.get("X-CSRF-TOKEN").asScala.head).toOption
98+
99+
val resultHeaders = new HttpHeaders()
100+
101+
logger.debug(s"Session Cookie: $sessionCookie")
102+
resultHeaders.add("cookie", sessionCookie)
103+
104+
csrfToken match {
105+
case Some(ct) =>
106+
logger.debug(s"CSRF Token: $ct")
107+
resultHeaders.add("X-CSRF-TOKEN", ct)
108+
case None =>
109+
logger.debug(s"CSRF Token not found")
110+
}
111+
112+
resultHeaders
113+
}
114+
}
115+
116+
class SpnegoAuthClient(
117+
credentials: KerberosCredentials,
118+
restTemplate: RestTemplate,
119+
apiCaller: ApiCaller,
120+
path: String
121+
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") {
122+
override protected def requestAuthentication(url: String): ResponseEntity[String] = {
123+
logger.info(
124+
s"Authenticating via SPNEGO ($url): user `${credentials.username}`, with keytab `${credentials.keytabLocation}`"
125+
)
126+
restTemplate.getForEntity(url, classOf[String])
127+
}
128+
}
129+
130+
class StandardAuthClient(
131+
credentials: StandardCredentials,
132+
restTemplate: RestTemplate,
133+
apiCaller: ApiCaller,
134+
path: String
135+
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") {
136+
override protected def requestAuthentication(url: String): ResponseEntity[String] = {
137+
val requestParts = new LinkedMultiValueMap[String, String]
138+
requestParts.add("username", credentials.username)
139+
requestParts.add("password", credentials.password)
140+
141+
logger.info(s"Authenticating via username and password ($url): user `${credentials.username}`")
142+
restTemplate.postForEntity(url, requestParts, classOf[String])
143+
}
144+
}
145+
146+
class StandardBase64AuthClient(
147+
credentials: StandardCredentialsBase64,
148+
restTemplate: RestTemplate,
149+
apiCaller: ApiCaller,
150+
path: String
151+
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") {
152+
override protected def requestAuthentication(url: String): ResponseEntity[String] = {
153+
val headers = new HttpHeaders()
154+
headers.add("Authorization", "Basic " + credentials.base64Credentials)
155+
val requestEntity = new HttpEntity(null, headers)
156+
restTemplate.exchange(url, HttpMethod.GET, requestEntity, classOf[String])
157+
}
158+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.client
17+
18+
sealed abstract class Credentials
19+
20+
case class StandardCredentials(username: String, password: String) extends Credentials
21+
22+
case class StandardCredentialsBase64(base64Credentials: String) extends Credentials
23+
24+
case class KerberosCredentials(username: String, keytabLocation: String) extends Credentials
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.client
17+
18+
import org.apache.commons.lang3.exception.ExceptionUtils
19+
import org.slf4j.LoggerFactory
20+
import org.springframework.web.client
21+
import org.springframework.web.client.ResourceAccessException
22+
import za.co.absa.hyperdrive.trigger.api.rest.client.CrossHostApiCaller.logger
23+
24+
import scala.annotation.tailrec
25+
import scala.util.Failure
26+
import scala.util.Random
27+
import scala.util.Try
28+
29+
class CrossHostApiCaller private (apiBaseUrls: Vector[String], maxTryCount: Int, private var currentHostIndex: Int)
30+
extends ApiCaller {
31+
def baseUrlsCount: Int = apiBaseUrls.size
32+
33+
def currentBaseUrl: String = apiBaseUrls(currentHostIndex)
34+
35+
def nextBaseUrl(): String = {
36+
currentHostIndex = (currentHostIndex + 1) % baseUrlsCount
37+
currentBaseUrl
38+
}
39+
40+
override def call[T](fn: String => T): T = {
41+
def logFailure(error: Throwable, url: String, attemptNumber: Int, nextUrl: Option[String]): Unit = {
42+
val rootCause = ExceptionUtils.getRootCauseMessage(error)
43+
val switching = nextUrl.map(s => s", switching host to $s").getOrElse("")
44+
logger.warn(s"Request failed on host $url (attempt $attemptNumber of $maxTryCount)$switching - $rootCause")
45+
}
46+
47+
@tailrec
48+
def attempt(url: String, attemptNumber: Int, urlsTried: Int): Try[T] = {
49+
val result = Try {
50+
fn(url)
51+
}.recoverWith { case e @ (_: ResourceAccessException | _: client.RestClientException) =>
52+
Failure(ApiClientException("Server non-responsive", e))
53+
}
54+
// using match instead of recoverWith to make the function @tailrec
55+
result match {
56+
case Failure(e: RetryableException) if attemptNumber < maxTryCount =>
57+
logFailure(e, url, attemptNumber, None)
58+
attempt(url, attemptNumber + 1, urlsTried)
59+
case Failure(e: RetryableException) if urlsTried < baseUrlsCount =>
60+
val nextUrl = nextBaseUrl()
61+
logFailure(e, url, attemptNumber, Option(nextUrl))
62+
attempt(nextUrl, 1, urlsTried + 1)
63+
case _ => result
64+
}
65+
}
66+
67+
attempt(currentBaseUrl, 1, 1).get
68+
}
69+
}
70+
71+
object CrossHostApiCaller {
72+
private val logger = LoggerFactory.getLogger(classOf[CrossHostApiCaller])
73+
74+
final val DefaultUrlsRetryCount: Int = 0
75+
76+
private def createInstance(
77+
apiBaseUrls: Seq[String],
78+
urlsRetryCount: Int,
79+
startWith: Option[Int]
80+
): CrossHostApiCaller = {
81+
val maxTryCount: Int = (
82+
if (urlsRetryCount < 0) {
83+
logger.warn(
84+
s"Urls retry count cannot be negative ($urlsRetryCount). Using default number of retries instead ($DefaultUrlsRetryCount)."
85+
) // scalastyle:ignore maxLineLength
86+
DefaultUrlsRetryCount
87+
} else {
88+
urlsRetryCount
89+
}
90+
) + 1
91+
val currentHostIndex = startWith.getOrElse(Random.nextInt(Math.max(apiBaseUrls.size, 1)))
92+
new CrossHostApiCaller(apiBaseUrls.toVector, maxTryCount, currentHostIndex)
93+
}
94+
95+
def apply(
96+
apiBaseUrls: Seq[String],
97+
urlsRetryCount: Int = DefaultUrlsRetryCount,
98+
startWith: Option[Int] = None
99+
): CrossHostApiCaller =
100+
createInstance(apiBaseUrls, urlsRetryCount, startWith)
101+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.client
17+
18+
abstract class RetryableException(message: String, cause: Throwable) extends Exception(message, cause)
19+
20+
final case class ApiClientException(private val message: String, private val cause: Throwable = None.orNull)
21+
extends RetryableException(message, cause)
22+
23+
final case class UnauthorizedException(private val message: String, private val cause: Throwable = None.orNull)
24+
extends Exception(message, cause)
25+
26+
final case class NotFoundException(private val message: String, private val cause: Throwable = None.orNull)
27+
extends Exception(message, cause)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.api.rest.client
17+
18+
import org.springframework.http.client.ClientHttpResponse
19+
import org.springframework.web.client.ResponseErrorHandler
20+
21+
object NoOpErrorHandler extends ResponseErrorHandler {
22+
override def hasError(response: ClientHttpResponse): Boolean = false
23+
24+
override def handleError(response: ClientHttpResponse): Unit = {}
25+
}

0 commit comments

Comments
 (0)