Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class AlertService(
val client: Client,
val xContentRegistry: NamedXContentRegistry,
val alertIndices: AlertIndices,
val sdkClient: SdkClient
val sdkClient: SdkClient,
val multiTenancyEnabled: Boolean = false
) {

companion object {
Expand Down Expand Up @@ -745,18 +746,10 @@ class AlertService(
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
}
Alert.State.COMPLETED -> {
deleteRequests.add(
DeleteDataObjectRequest.builder()
.index(alertsIndex)
.id(alert.id)
.routing(routingId)
.tenantId(currentTenantId())
.build()
)
if (alertIndices.isAlertHistoryEnabled()) {
if (multiTenancyEnabled) {
Comment thread
vikhy-aws marked this conversation as resolved.
putRequests.add(
PutDataObjectRequest.builder()
.index(alertsHistoryIndex)
.index(alertsIndex)
.id(alert.id)
.routing(routingId)
.tenantId(currentTenantId())
Expand All @@ -765,7 +758,28 @@ class AlertService(
.build()
)
} else {
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
Comment thread
vikhy-aws marked this conversation as resolved.
deleteRequests.add(
DeleteDataObjectRequest.builder()
.index(alertsIndex)
.id(alert.id)
.routing(routingId)
.tenantId(currentTenantId())
.build()
)
if (alertIndices.isAlertHistoryEnabled()) {
putRequests.add(
PutDataObjectRequest.builder()
.index(alertsHistoryIndex)
.id(alert.id)
.routing(routingId)
.tenantId(currentTenantId())
.overwriteIfExists(true)
.dataObject(ToXContentObject { builder, _ -> alert.toXContentWithUser(builder) })
.build()
)
} else {
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
client.threadPool().executor(ThreadPool.Names.GENERIC)
)

val alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient)
val alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, MULTI_TENANCY_ENABLED.get(settings))
val triggerService = TriggerService(scriptService)
runner = MonitorRunnerService
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

package org.opensearch.alerting

import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
import org.mockito.Mockito.verify
import org.opensearch.Version
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.getBucketKeysHash
Expand All @@ -16,19 +21,26 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.remote.metadata.client.BulkDataObjectRequest
import org.opensearch.remote.metadata.client.BulkDataObjectResponse
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.test.ClusterServiceUtils
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.client.Client
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.CompletableFuture
import org.mockito.Mockito.`when` as whenever

class AlertServiceTests : OpenSearchTestCase() {

private lateinit var client: Client
Expand All @@ -39,6 +51,7 @@ class AlertServiceTests : OpenSearchTestCase() {

private lateinit var alertIndices: AlertIndices
private lateinit var alertService: AlertService
private lateinit var sdkClient: SdkClient

@Before
fun setup() {
Expand All @@ -47,6 +60,7 @@ class AlertServiceTests : OpenSearchTestCase() {
xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java)
threadPool = Mockito.mock(ThreadPool::class.java)
clusterService = Mockito.mock(ClusterService::class.java)
sdkClient = Mockito.mock(SdkClient::class.java)
settings = Settings.builder().build()
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand All @@ -67,7 +81,7 @@ class AlertServiceTests : OpenSearchTestCase() {
clusterService = Mockito.spy(testClusterService)

alertIndices = AlertIndices(settings, client, threadPool, clusterService)
alertService = AlertService(client, xContentRegistry, alertIndices, Mockito.mock(SdkClient::class.java))
alertService = AlertService(client, xContentRegistry, alertIndices, sdkClient)
}

fun `test getting categorized alerts for bucket-level monitor with no current alerts`() {
Expand Down Expand Up @@ -215,6 +229,84 @@ class AlertServiceTests : OpenSearchTestCase() {
assertAlertsExistForBucketKeys(emptyList(), completedAlerts)
}

fun `test saveAlerts COMPLETED state with multiTenancyEnabled puts to alertsIndex without delete`() {
val multiTenantAlertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, multiTenancyEnabled = true)

val trigger = randomBucketLevelTrigger()
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))
val alert = Alert(
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = listOf(randomActionExecutionResult())
).copy(state = Alert.State.COMPLETED, endTime = Instant.now())

val bulkResponse = Mockito.mock(BulkDataObjectResponse::class.java)
whenever(bulkResponse.responses).thenReturn(emptyList())
whenever(sdkClient.bulkDataObjectAsync(any(BulkDataObjectRequest::class.java)))
.thenReturn(CompletableFuture.completedFuture(bulkResponse))

runBlocking {
multiTenantAlertService.saveAlerts(
DataSources(),
listOf(alert),
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 1),
routingId = monitor.id
)
}

val captor = ArgumentCaptor.forClass(BulkDataObjectRequest::class.java)
verify(sdkClient).bulkDataObjectAsync(captor.capture())
val capturedRequest = captor.value
// Should have only put requests (no delete requests) when multiTenancyEnabled
assertTrue("Expected put requests in bulk", capturedRequest.requests().isNotEmpty())
assertTrue(
"Expected PutDataObjectRequest targeting alertsIndex",
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.PutDataObjectRequest }
)
assertFalse(
"Expected no DeleteDataObjectRequest when multiTenancyEnabled",
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.DeleteDataObjectRequest }
)
}

fun `test saveAlerts COMPLETED state without multiTenancyEnabled creates delete and history put`() {
val nonMultiTenantAlertService = AlertService(client, xContentRegistry, alertIndices, sdkClient, multiTenancyEnabled = false)

val trigger = randomBucketLevelTrigger()
val monitor = randomBucketLevelMonitor(triggers = listOf(trigger))
val alert = Alert(
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = listOf(randomActionExecutionResult())
).copy(state = Alert.State.COMPLETED, endTime = Instant.now())

val bulkResponse = Mockito.mock(BulkDataObjectResponse::class.java)
whenever(bulkResponse.responses).thenReturn(emptyList())
whenever(sdkClient.bulkDataObjectAsync(any(BulkDataObjectRequest::class.java)))
.thenReturn(CompletableFuture.completedFuture(bulkResponse))

runBlocking {
nonMultiTenantAlertService.saveAlerts(
DataSources(),
listOf(alert),
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 1),
routingId = monitor.id
)
}

val captor = ArgumentCaptor.forClass(BulkDataObjectRequest::class.java)
verify(sdkClient).bulkDataObjectAsync(captor.capture())
val capturedRequest = captor.value
// Should have a delete request for the alertsIndex
assertTrue(
"Expected DeleteDataObjectRequest when multiTenancy disabled",
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.DeleteDataObjectRequest }
)
// Should have a put request for alertsHistoryIndex (history enabled by default)
assertTrue(
"Expected PutDataObjectRequest for history index",
capturedRequest.requests().any { it is org.opensearch.remote.metadata.client.PutDataObjectRequest }
)
}

private fun createCurrentAlertsFromBucketKeys(
monitor: Monitor,
trigger: BucketLevelTrigger,
Expand Down