diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 3cfedf36e..c3ddb26c3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -71,7 +71,8 @@ class AlertService( val client: Client, val xContentRegistry: NamedXContentRegistry, val alertIndices: AlertIndices, - val sdkClient: SdkClient + val sdkClient: SdkClient, + val multiTenancyEnabled: Boolean = false ) { companion object { @@ -745,18 +746,10 @@ class AlertService( throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert") } Alert.State.COMPLETED -> { - deleteRequests.add( - DeleteDataObjectRequest.builder() - .index(alertsIndex) - .id(alert.id) - .routing(routingId) - .tenantId(currentTenantId()) - .build() - ) - if (alertIndices.isAlertHistoryEnabled()) { + if (multiTenancyEnabled) { putRequests.add( PutDataObjectRequest.builder() - .index(alertsHistoryIndex) + .index(alertsIndex) .id(alert.id) .routing(routingId) .tenantId(currentTenantId()) @@ -765,7 +758,28 @@ class AlertService( .build() ) } else { - commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id))) + deleteRequests.add( + DeleteDataObjectRequest.builder() + .index(alertsIndex) + .id(alert.id) + .routing(routingId) + .tenantId(currentTenantId()) + .build() + ) + if (alertIndices.isAlertHistoryEnabled()) { + putRequests.add( + PutDataObjectRequest.builder() + .index(alertsHistoryIndex) + .id(alert.id) + .routing(routingId) + .tenantId(currentTenantId()) + .overwriteIfExists(true) + .dataObject(ToXContentObject { builder, _ -> alert.toXContentWithUser(builder) }) + .build() + ) + } else { + commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id))) + } } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index e7b440a2e..a2913ce33 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -322,7 +322,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R client.threadPool().executor(ThreadPool.Names.GENERIC) ) - val alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient) + val alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, MULTI_TENANCY_ENABLED.get(settings)) val triggerService = TriggerService(scriptService) runner = MonitorRunnerService .registerClusterService(clusterService) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt index 59e9608fc..5e19bc7ed 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt @@ -5,9 +5,14 @@ package org.opensearch.alerting +import kotlinx.coroutines.runBlocking import org.junit.Before +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any import org.mockito.Mockito +import org.mockito.Mockito.verify import org.opensearch.Version +import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.getBucketKeysHash @@ -16,12 +21,16 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Setting import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.remote.metadata.client.BulkDataObjectRequest +import org.opensearch.remote.metadata.client.BulkDataObjectResponse import org.opensearch.remote.metadata.client.SdkClient import org.opensearch.test.ClusterServiceUtils import org.opensearch.test.OpenSearchTestCase @@ -29,6 +38,9 @@ import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.client.Client import java.time.Instant import java.time.temporal.ChronoUnit +import java.util.concurrent.CompletableFuture +import org.mockito.Mockito.`when` as whenever + class AlertServiceTests : OpenSearchTestCase() { private lateinit var client: Client @@ -39,6 +51,7 @@ class AlertServiceTests : OpenSearchTestCase() { private lateinit var alertIndices: AlertIndices private lateinit var alertService: AlertService + private lateinit var sdkClient: SdkClient @Before fun setup() { @@ -47,6 +60,7 @@ class AlertServiceTests : OpenSearchTestCase() { xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java) threadPool = Mockito.mock(ThreadPool::class.java) clusterService = Mockito.mock(ClusterService::class.java) + sdkClient = Mockito.mock(SdkClient::class.java) settings = Settings.builder().build() val settingSet = hashSetOf>() settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) @@ -67,7 +81,7 @@ class AlertServiceTests : OpenSearchTestCase() { clusterService = Mockito.spy(testClusterService) alertIndices = AlertIndices(settings, client, threadPool, clusterService) - alertService = AlertService(client, xContentRegistry, alertIndices, Mockito.mock(SdkClient::class.java)) + alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient) } fun `test getting categorized alerts for bucket-level monitor with no current alerts`() { @@ -215,6 +229,84 @@ class AlertServiceTests : OpenSearchTestCase() { assertAlertsExistForBucketKeys(emptyList(), completedAlerts) } + fun `test saveAlerts COMPLETED state with multiTenancyEnabled puts to alertsIndex without delete`() { + val multiTenantAlertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, multiTenancyEnabled = true) + + val trigger = randomBucketLevelTrigger() + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + val alert = Alert( + monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = listOf(randomActionExecutionResult()) + ).copy(state = Alert.State.COMPLETED, endTime = Instant.now()) + + val bulkResponse = Mockito.mock(BulkDataObjectResponse::class.java) + whenever(bulkResponse.responses).thenReturn(emptyList()) + whenever(sdkClient.bulkDataObjectAsync(any(BulkDataObjectRequest::class.java))) + .thenReturn(CompletableFuture.completedFuture(bulkResponse)) + + runBlocking { + multiTenantAlertService.saveAlerts( + DataSources(), + listOf(alert), + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 1), + routingId = monitor.id + ) + } + + val captor = ArgumentCaptor.forClass(BulkDataObjectRequest::class.java) + verify(sdkClient).bulkDataObjectAsync(captor.capture()) + val capturedRequest = captor.value + // Should have only put requests (no delete requests) when multiTenancyEnabled + assertTrue("Expected put requests in bulk", capturedRequest.requests().isNotEmpty()) + assertTrue( + "Expected PutDataObjectRequest targeting alertsIndex", + capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.PutDataObjectRequest } + ) + assertFalse( + "Expected no DeleteDataObjectRequest when multiTenancyEnabled", + capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.DeleteDataObjectRequest } + ) + } + + fun `test saveAlerts COMPLETED state without multiTenancyEnabled creates delete and history put`() { + val nonMultiTenantAlertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, multiTenancyEnabled = false) + + val trigger = randomBucketLevelTrigger() + val monitor = randomBucketLevelMonitor(triggers = listOf(trigger)) + val alert = Alert( + monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = listOf(randomActionExecutionResult()) + ).copy(state = Alert.State.COMPLETED, endTime = Instant.now()) + + val bulkResponse = Mockito.mock(BulkDataObjectResponse::class.java) + whenever(bulkResponse.responses).thenReturn(emptyList()) + whenever(sdkClient.bulkDataObjectAsync(any(BulkDataObjectRequest::class.java))) + .thenReturn(CompletableFuture.completedFuture(bulkResponse)) + + runBlocking { + nonMultiTenantAlertService.saveAlerts( + DataSources(), + listOf(alert), + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 1), + routingId = monitor.id + ) + } + + val captor = ArgumentCaptor.forClass(BulkDataObjectRequest::class.java) + verify(sdkClient).bulkDataObjectAsync(captor.capture()) + val capturedRequest = captor.value + // Should have a delete request for the alertsIndex + assertTrue( + "Expected DeleteDataObjectRequest when multiTenancy disabled", + capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.DeleteDataObjectRequest } + ) + // Should have a put request for alertsHistoryIndex (history enabled by default) + assertTrue( + "Expected PutDataObjectRequest for history index", + capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.PutDataObjectRequest } + ) + } + private fun createCurrentAlertsFromBucketKeys( monitor: Monitor, trigger: BucketLevelTrigger,