Skip to content

Commit d906553

Browse files
committed
Start refactor to wrappers round search client to make testing easier
1 parent 4218fc4 commit d906553

File tree

5 files changed

+193
-84
lines changed

5 files changed

+193
-84
lines changed

src/main/kotlin/no/java/conf/plugins/Search.kt

+14-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import io.ktor.server.routing.route
1414
import io.ktor.server.routing.routing
1515
import no.java.conf.model.search.TextSearchRequest
1616
import no.java.conf.service.SearchService
17+
import no.java.conf.service.search.ElasticIndexer
18+
import no.java.conf.service.search.ElasticIngester
1719

1820
private val logger = KotlinLogging.logger {}
1921

@@ -49,21 +51,30 @@ fun Application.searchClient() =
4951

5052
fun searchService(
5153
searchClient: SearchClient,
54+
indexer: ElasticIndexer,
55+
ingester: ElasticIngester,
5256
skipIndex: Boolean
5357
) = SearchService(
5458
client = searchClient,
59+
indexer = indexer,
60+
ingester = ingester,
5561
skipIndex = skipIndex,
5662
)
5763

58-
fun Application.searchService() =
59-
searchService(
60-
searchClient = searchClient(),
64+
fun Application.searchService(): SearchService {
65+
val searchClient = searchClient()
66+
67+
return searchService(
68+
searchClient = searchClient,
69+
indexer = ElasticIndexer(searchClient),
70+
ingester = ElasticIngester(searchClient),
6171
skipIndex =
6272
environment.config
6373
.property("elastic.skipindex")
6474
.getString()
6575
.toBoolean()
6676
)
77+
}
6778

6879
fun Application.configureSearchRouting(service: SearchService) {
6980
routing {

src/main/kotlin/no/java/conf/service/SearchService.kt

+6-75
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,8 @@ import arrow.core.raise.ensure
55
import arrow.core.raise.ensureNotNull
66
import com.jillesvangurp.jsondsl.json
77
import com.jillesvangurp.ktsearch.Aggregations
8-
import com.jillesvangurp.ktsearch.BulkItemCallBack
9-
import com.jillesvangurp.ktsearch.BulkResponse
10-
import com.jillesvangurp.ktsearch.OperationType
118
import com.jillesvangurp.ktsearch.SearchClient
12-
import com.jillesvangurp.ktsearch.bulk
139
import com.jillesvangurp.ktsearch.count
14-
import com.jillesvangurp.ktsearch.createIndex
15-
import com.jillesvangurp.ktsearch.deleteIndex
1610
import com.jillesvangurp.ktsearch.parseHits
1711
import com.jillesvangurp.ktsearch.parsedBuckets
1812
import com.jillesvangurp.ktsearch.search
@@ -25,7 +19,6 @@ import com.jillesvangurp.searchdsls.querydsl.exists
2519
import com.jillesvangurp.searchdsls.querydsl.nested
2620
import com.jillesvangurp.searchdsls.querydsl.simpleQueryString
2721
import com.jillesvangurp.searchdsls.querydsl.terms
28-
import com.jillesvangurp.serializationext.DEFAULT_JSON
2922
import io.github.oshai.kotlinlogging.KotlinLogging
3023
import no.java.conf.model.AggregationsNotFound
3124
import no.java.conf.model.ApiError
@@ -42,16 +35,12 @@ import no.java.conf.model.search.YearAggregate
4235
import no.java.conf.model.search.hasFilter
4336
import no.java.conf.model.search.hasQuery
4437
import no.java.conf.model.sessions.Session
45-
import no.java.conf.model.sessions.Speaker
46-
import kotlin.time.Duration.Companion.seconds
38+
import no.java.conf.service.search.ElasticIndexer
39+
import no.java.conf.service.search.ElasticIngester
4740
import kotlin.time.measureTimedValue
4841

4942
private const val INDEX_NAME = "javazone"
5043

51-
private const val REPLICAS = 0
52-
private const val SHARDS = 3
53-
private const val REFRESH = 10
54-
5544
private val logger = KotlinLogging.logger {}
5645

5746
enum class State {
@@ -62,6 +51,8 @@ enum class State {
6251

6352
class SearchService(
6453
private val client: SearchClient,
54+
private val indexer: ElasticIndexer,
55+
private val ingester: ElasticIngester,
6556
private val skipIndex: Boolean,
6657
) {
6758
private var readyState = State.NEW
@@ -78,30 +69,7 @@ class SearchService(
7869
if (!skipIndex) {
7970
logger.debug { "Creating index" }
8071

81-
client.deleteIndex(INDEX_NAME, ignoreUnavailable = true)
82-
83-
client.createIndex(INDEX_NAME) {
84-
settings {
85-
replicas = REPLICAS
86-
shards = SHARDS
87-
refreshInterval = REFRESH.seconds
88-
}
89-
mappings(dynamicEnabled = false) {
90-
text(Session::title)
91-
text(Session::abstract)
92-
text(Session::intendedAudience)
93-
keyword(Session::year)
94-
keyword(Session::video)
95-
keyword(Session::sessionId)
96-
keyword(Session::format)
97-
keyword(Session::language)
98-
nestedField("speakers") {
99-
text(Speaker::name)
100-
keyword(Speaker::twitter)
101-
text(Speaker::bio)
102-
}
103-
}
104-
}
72+
indexer.recreateIndex(INDEX_NAME)
10573
}
10674

10775
logger.debug { "State -> Mapped" }
@@ -121,44 +89,7 @@ class SearchService(
12189
if (!skipIndex) {
12290
logger.debug { "Bulk" }
12391

124-
val itemCallBack =
125-
object : BulkItemCallBack {
126-
override fun bulkRequestFailed(
127-
e: Exception,
128-
ops: List<Pair<String, String?>>,
129-
) {
130-
logger.error(e) { "Bulk failed" }
131-
}
132-
133-
override fun itemFailed(
134-
operationType: OperationType,
135-
item: BulkResponse.ItemDetails,
136-
) {
137-
logger.warn { "${operationType.name} failed ${item.id} with ${item.status}" }
138-
}
139-
140-
override fun itemOk(
141-
operationType: OperationType,
142-
item: BulkResponse.ItemDetails,
143-
) {
144-
logger.trace {
145-
"${operationType.name} completed ${item.id} seq ${item.seqNo} p_term ${item.primaryTerm}"
146-
}
147-
}
148-
}
149-
150-
val timeTaken =
151-
measureTimedValue {
152-
client.bulk(callBack = itemCallBack) {
153-
sessions.forEach { session ->
154-
index(
155-
source = DEFAULT_JSON.encodeToString(Session.serializer(), session),
156-
index = INDEX_NAME,
157-
id = session.sessionId,
158-
)
159-
}
160-
}
161-
}
92+
val timeTaken = measureTimedValue { ingester.ingest(INDEX_NAME, sessions) }
16293

16394
logger.info { "Time taken to index - $timeTaken" }
16495
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package no.java.conf.service.search
2+
3+
import com.jillesvangurp.ktsearch.SearchClient
4+
import com.jillesvangurp.ktsearch.createIndex
5+
import com.jillesvangurp.ktsearch.deleteIndex
6+
import no.java.conf.model.sessions.Session
7+
import no.java.conf.model.sessions.Speaker
8+
import kotlin.time.Duration.Companion.seconds
9+
10+
private const val REPLICAS = 0
11+
private const val SHARDS = 3
12+
private const val REFRESH = 10
13+
14+
class ElasticIndexer(
15+
private val client: SearchClient,
16+
) {
17+
suspend fun recreateIndex(indexName: String) {
18+
client.deleteIndex(indexName, ignoreUnavailable = true)
19+
20+
client.createIndex(indexName) {
21+
settings {
22+
replicas = REPLICAS
23+
shards = SHARDS
24+
refreshInterval = REFRESH.seconds
25+
}
26+
mappings(dynamicEnabled = false) {
27+
text(Session::title)
28+
text(Session::abstract)
29+
text(Session::intendedAudience)
30+
keyword(Session::year)
31+
keyword(Session::video)
32+
keyword(Session::sessionId)
33+
keyword(Session::format)
34+
keyword(Session::language)
35+
nestedField("speakers") {
36+
text(Speaker::name)
37+
keyword(Speaker::twitter)
38+
text(Speaker::bio)
39+
}
40+
}
41+
}
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package no.java.conf.service.search
2+
3+
import com.jillesvangurp.ktsearch.BulkItemCallBack
4+
import com.jillesvangurp.ktsearch.BulkResponse
5+
import com.jillesvangurp.ktsearch.OperationType
6+
import com.jillesvangurp.ktsearch.SearchClient
7+
import com.jillesvangurp.ktsearch.bulk
8+
import com.jillesvangurp.serializationext.DEFAULT_JSON
9+
import io.github.oshai.kotlinlogging.KotlinLogging
10+
import no.java.conf.model.sessions.Session
11+
12+
private val logger = KotlinLogging.logger {}
13+
14+
class ElasticIngester(
15+
private val client: SearchClient,
16+
) {
17+
val itemCallBack =
18+
object : BulkItemCallBack {
19+
override fun bulkRequestFailed(
20+
e: Exception,
21+
ops: List<Pair<String, String?>>,
22+
) {
23+
logger.error(e) { "Bulk failed" }
24+
}
25+
26+
override fun itemFailed(
27+
operationType: OperationType,
28+
item: BulkResponse.ItemDetails,
29+
) {
30+
logger.warn { "${operationType.name} failed ${item.id} with ${item.status}" }
31+
}
32+
33+
override fun itemOk(
34+
operationType: OperationType,
35+
item: BulkResponse.ItemDetails,
36+
) {
37+
logger.trace {
38+
"${operationType.name} completed ${item.id} seq ${item.seqNo} p_term ${item.primaryTerm}"
39+
}
40+
}
41+
}
42+
43+
suspend fun ingest(
44+
indexName: String,
45+
sessions: List<Session>
46+
) {
47+
client.bulk(callBack = itemCallBack) {
48+
sessions.forEach { session ->
49+
index(
50+
source = DEFAULT_JSON.encodeToString(Session.serializer(), session),
51+
index = indexName,
52+
id = session.sessionId,
53+
)
54+
}
55+
}
56+
}
57+
}

src/test/kotlin/no/java/conf/StateTest.kt

+73-6
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,26 @@ import io.ktor.client.request.get
55
import io.ktor.client.statement.bodyAsText
66
import io.ktor.http.HttpStatusCode
77
import io.ktor.server.testing.testApplication
8+
import io.mockk.coEvery
9+
import io.mockk.just
810
import io.mockk.mockk
11+
import io.mockk.runs
912
import kotlinx.coroutines.runBlocking
1013
import no.java.conf.plugins.configureSearchRouting
1114
import no.java.conf.service.SearchService
15+
import no.java.conf.service.search.ElasticIndexer
16+
import no.java.conf.service.search.ElasticIngester
1217
import kotlin.test.Test
1318
import kotlin.test.assertEquals
1419

1520
class StateTest {
1621
@Test
17-
fun testNewState() {
22+
fun testNew() {
1823
val searchClient = mockk<SearchClient>()
24+
val indexer = mockk<ElasticIndexer>()
25+
val ingester = mockk<ElasticIngester>()
1926

20-
val service = SearchService(searchClient, false)
27+
val service = SearchService(searchClient, indexer, ingester, false)
2128

2229
testApplication {
2330
application {
@@ -32,10 +39,66 @@ class StateTest {
3239
}
3340

3441
@Test
35-
fun testSkipIndex() {
42+
fun testMapped() {
3643
val searchClient = mockk<SearchClient>()
44+
val indexer = mockk<ElasticIndexer>()
45+
val ingester = mockk<ElasticIngester>()
3746

38-
val service = SearchService(searchClient, true)
47+
val service = SearchService(searchClient, indexer, ingester, false)
48+
49+
coEvery { indexer.recreateIndex(any()) } just runs
50+
51+
runBlocking {
52+
service.setup()
53+
}
54+
55+
testApplication {
56+
application {
57+
configureSearchRouting(service)
58+
}
59+
60+
client.get("/api/search/state").apply {
61+
assertEquals(HttpStatusCode.OK, status)
62+
assertEquals(bodyAsText(), "MAPPED")
63+
}
64+
}
65+
}
66+
67+
@Test
68+
fun testIndexed() {
69+
val searchClient = mockk<SearchClient>()
70+
val indexer = mockk<ElasticIndexer>()
71+
val ingester = mockk<ElasticIngester>()
72+
73+
val service = SearchService(searchClient, indexer, ingester, false)
74+
75+
coEvery { indexer.recreateIndex(any()) } just runs
76+
coEvery { ingester.ingest(any(), any()) } just runs
77+
78+
runBlocking {
79+
service.setup()
80+
service.ingest(emptyList())
81+
}
82+
83+
testApplication {
84+
application {
85+
configureSearchRouting(service)
86+
}
87+
88+
client.get("/api/search/state").apply {
89+
assertEquals(HttpStatusCode.OK, status)
90+
assertEquals(bodyAsText(), "INDEXED")
91+
}
92+
}
93+
}
94+
95+
@Test
96+
fun testSkipIndexNew() {
97+
val searchClient = mockk<SearchClient>()
98+
val indexer = mockk<ElasticIndexer>()
99+
val ingester = mockk<ElasticIngester>()
100+
101+
val service = SearchService(searchClient, indexer, ingester, true)
39102

40103
testApplication {
41104
application {
@@ -52,8 +115,10 @@ class StateTest {
52115
@Test
53116
fun testSkipIndexMapped() {
54117
val searchClient = mockk<SearchClient>()
118+
val indexer = mockk<ElasticIndexer>()
119+
val ingester = mockk<ElasticIngester>()
55120

56-
val service = SearchService(searchClient, true)
121+
val service = SearchService(searchClient, indexer, ingester, true)
57122

58123
runBlocking {
59124
service.setup()
@@ -74,8 +139,10 @@ class StateTest {
74139
@Test
75140
fun testSkipIndexIndexed() {
76141
val searchClient = mockk<SearchClient>()
142+
val indexer = mockk<ElasticIndexer>()
143+
val ingester = mockk<ElasticIngester>()
77144

78-
val service = SearchService(searchClient, true)
145+
val service = SearchService(searchClient, indexer, ingester, true)
79146

80147
runBlocking {
81148
service.setup()

0 commit comments

Comments
 (0)