Skip to content

Commit 2a1e810

Browse files
committed
Make PutDataObjectRequest call when alert transitions into COMPLETED state when multi tenancy is enabled
Signed-off-by: Sai Vikhyath Kudhroli <[email protected]>
1 parent c2c63e3 commit 2a1e810

3 files changed

Lines changed: 120 additions & 14 deletions

File tree

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ class AlertService(
7171
val client: Client,
7272
val xContentRegistry: NamedXContentRegistry,
7373
val alertIndices: AlertIndices,
74-
val sdkClient: SdkClient
74+
val sdkClient: SdkClient,
75+
val multiTenancyEnabled: Boolean = false
7576
) {
7677

7778
companion object {
@@ -745,18 +746,10 @@ class AlertService(
745746
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
746747
}
747748
Alert.State.COMPLETED -> {
748-
deleteRequests.add(
749-
DeleteDataObjectRequest.builder()
750-
.index(alertsIndex)
751-
.id(alert.id)
752-
.routing(routingId)
753-
.tenantId(currentTenantId())
754-
.build()
755-
)
756-
if (alertIndices.isAlertHistoryEnabled()) {
749+
if (multiTenancyEnabled) {
757750
putRequests.add(
758751
PutDataObjectRequest.builder()
759-
.index(alertsHistoryIndex)
752+
.index(alertsIndex)
760753
.id(alert.id)
761754
.routing(routingId)
762755
.tenantId(currentTenantId())
@@ -765,7 +758,28 @@ class AlertService(
765758
.build()
766759
)
767760
} else {
768-
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
761+
deleteRequests.add(
762+
DeleteDataObjectRequest.builder()
763+
.index(alertsIndex)
764+
.id(alert.id)
765+
.routing(routingId)
766+
.tenantId(currentTenantId())
767+
.build()
768+
)
769+
if (alertIndices.isAlertHistoryEnabled()) {
770+
putRequests.add(
771+
PutDataObjectRequest.builder()
772+
.index(alertsHistoryIndex)
773+
.id(alert.id)
774+
.routing(routingId)
775+
.tenantId(currentTenantId())
776+
.overwriteIfExists(true)
777+
.dataObject(ToXContentObject { builder, _ -> alert.toXContentWithUser(builder) })
778+
.build()
779+
)
780+
} else {
781+
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
782+
}
769783
}
770784
}
771785
}

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
322322
client.threadPool().executor(ThreadPool.Names.GENERIC)
323323
)
324324

325-
val alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient)
325+
val alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, MULTI_TENANCY_ENABLED.get(settings))
326326
val triggerService = TriggerService(scriptService)
327327
runner = MonitorRunnerService
328328
.registerClusterService(clusterService)

alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55

66
package org.opensearch.alerting
77

8+
import kotlinx.coroutines.runBlocking
89
import org.junit.Before
10+
import org.mockito.ArgumentCaptor
11+
import org.mockito.ArgumentMatchers.any
912
import org.mockito.Mockito
13+
import org.mockito.Mockito.verify
1014
import org.opensearch.Version
15+
import org.opensearch.action.bulk.BackoffPolicy
1116
import org.opensearch.alerting.alerts.AlertIndices
1217
import org.opensearch.alerting.settings.AlertingSettings
1318
import org.opensearch.alerting.util.getBucketKeysHash
@@ -16,19 +21,26 @@ import org.opensearch.cluster.service.ClusterService
1621
import org.opensearch.common.settings.ClusterSettings
1722
import org.opensearch.common.settings.Setting
1823
import org.opensearch.common.settings.Settings
24+
import org.opensearch.common.unit.TimeValue
1925
import org.opensearch.commons.alerting.model.AggregationResultBucket
2026
import org.opensearch.commons.alerting.model.Alert
2127
import org.opensearch.commons.alerting.model.BucketLevelTrigger
28+
import org.opensearch.commons.alerting.model.DataSources
2229
import org.opensearch.commons.alerting.model.Monitor
2330
import org.opensearch.commons.alerting.model.action.AlertCategory
2431
import org.opensearch.core.xcontent.NamedXContentRegistry
32+
import org.opensearch.remote.metadata.client.BulkDataObjectRequest
33+
import org.opensearch.remote.metadata.client.BulkDataObjectResponse
2534
import org.opensearch.remote.metadata.client.SdkClient
2635
import org.opensearch.test.ClusterServiceUtils
2736
import org.opensearch.test.OpenSearchTestCase
2837
import org.opensearch.threadpool.ThreadPool
2938
import org.opensearch.transport.client.Client
3039
import java.time.Instant
3140
import java.time.temporal.ChronoUnit
41+
import java.util.concurrent.CompletableFuture
42+
import org.mockito.Mockito.`when` as whenever
43+
3244
class AlertServiceTests : OpenSearchTestCase() {
3345

3446
private lateinit var client: Client
@@ -39,6 +51,7 @@ class AlertServiceTests : OpenSearchTestCase() {
3951

4052
private lateinit var alertIndices: AlertIndices
4153
private lateinit var alertService: AlertService
54+
private lateinit var sdkClient: SdkClient
4255

4356
@Before
4457
fun setup() {
@@ -47,6 +60,7 @@ class AlertServiceTests : OpenSearchTestCase() {
4760
xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java)
4861
threadPool = Mockito.mock(ThreadPool::class.java)
4962
clusterService = Mockito.mock(ClusterService::class.java)
63+
sdkClient = Mockito.mock(SdkClient::class.java)
5064
settings = Settings.builder().build()
5165
val settingSet = hashSetOf<Setting<*>>()
5266
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
@@ -67,7 +81,7 @@ class AlertServiceTests : OpenSearchTestCase() {
6781
clusterService = Mockito.spy(testClusterService)
6882

6983
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
70-
alertService = AlertService(client, xContentRegistry, alertIndices, Mockito.mock(SdkClient::class.java))
84+
alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient)
7185
}
7286

7387
fun `test getting categorized alerts for bucket-level monitor with no current alerts`() {
@@ -215,6 +229,84 @@ class AlertServiceTests : OpenSearchTestCase() {
215229
assertAlertsExistForBucketKeys(emptyList(), completedAlerts)
216230
}
217231

232+
fun `test saveAlerts COMPLETED state with multiTenancyEnabled puts to alertsIndex without delete`() {
233+
val multiTenantAlertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, multiTenancyEnabled = true)
234+
235+
val trigger = randomBucketLevelTrigger()
236+
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))
237+
val alert = Alert(
238+
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
239+
actionExecutionResults = listOf(randomActionExecutionResult())
240+
).copy(state = Alert.State.COMPLETED, endTime = Instant.now())
241+
242+
val bulkResponse = Mockito.mock(BulkDataObjectResponse::class.java)
243+
whenever(bulkResponse.responses).thenReturn(emptyList())
244+
whenever(sdkClient.bulkDataObjectAsync(any(BulkDataObjectRequest::class.java)))
245+
.thenReturn(CompletableFuture.completedFuture(bulkResponse))
246+
247+
runBlocking {
248+
multiTenantAlertService.saveAlerts(
249+
DataSources(),
250+
listOf(alert),
251+
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 1),
252+
routingId = monitor.id
253+
)
254+
}
255+
256+
val captor = ArgumentCaptor.forClass(BulkDataObjectRequest::class.java)
257+
verify(sdkClient).bulkDataObjectAsync(captor.capture())
258+
val capturedRequest = captor.value
259+
// Should have only put requests (no delete requests) when multiTenancyEnabled
260+
assertTrue("Expected put requests in bulk", capturedRequest.requests().isNotEmpty())
261+
assertTrue(
262+
"Expected PutDataObjectRequest targeting alertsIndex",
263+
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.PutDataObjectRequest }
264+
)
265+
assertFalse(
266+
"Expected no DeleteDataObjectRequest when multiTenancyEnabled",
267+
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.DeleteDataObjectRequest }
268+
)
269+
}
270+
271+
fun `test saveAlerts COMPLETED state without multiTenancyEnabled creates delete and history put`() {
272+
val nonMultiTenantAlertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, multiTenancyEnabled = false)
273+
274+
val trigger = randomBucketLevelTrigger()
275+
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))
276+
val alert = Alert(
277+
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
278+
actionExecutionResults = listOf(randomActionExecutionResult())
279+
).copy(state = Alert.State.COMPLETED, endTime = Instant.now())
280+
281+
val bulkResponse = Mockito.mock(BulkDataObjectResponse::class.java)
282+
whenever(bulkResponse.responses).thenReturn(emptyList())
283+
whenever(sdkClient.bulkDataObjectAsync(any(BulkDataObjectRequest::class.java)))
284+
.thenReturn(CompletableFuture.completedFuture(bulkResponse))
285+
286+
runBlocking {
287+
nonMultiTenantAlertService.saveAlerts(
288+
DataSources(),
289+
listOf(alert),
290+
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 1),
291+
routingId = monitor.id
292+
)
293+
}
294+
295+
val captor = ArgumentCaptor.forClass(BulkDataObjectRequest::class.java)
296+
verify(sdkClient).bulkDataObjectAsync(captor.capture())
297+
val capturedRequest = captor.value
298+
// Should have a delete request for the alertsIndex
299+
assertTrue(
300+
"Expected DeleteDataObjectRequest when multiTenancy disabled",
301+
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.DeleteDataObjectRequest }
302+
)
303+
// Should have a put request for alertsHistoryIndex (history enabled by default)
304+
assertTrue(
305+
"Expected PutDataObjectRequest for history index",
306+
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.PutDataObjectRequest }
307+
)
308+
}
309+
218310
private fun createCurrentAlertsFromBucketKeys(
219311
monitor: Monitor,
220312
trigger: BucketLevelTrigger,

0 commit comments

Comments
 (0)