diff --git a/alerting/build.gradle b/alerting/build.gradle index 0114422d3..1a8f5b75b 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -392,6 +392,46 @@ integTest { excludeTestsMatching "org.opensearch.alerting.bwc.*IT" } } + + task integTestPluggableDataformat(type: RestIntegTestTask) { + description = "Runs integration tests with pluggable dataformat feature flag enabled" + dependsOn bundlePlugin + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + + systemProperty 'tests.security.manager', 'false' + systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath + + filter { + includeTestsMatching "org.opensearch.alerting.transport.PluggableDataFormatMonitorBlockIT" + } + } + + testClusters.integTestPluggableDataformat { + testDistribution = "ARCHIVE" + systemProperty 'opensearch.experimental.feature.pluggable.dataformat.enabled', 'true' + + // Include same plugins as main integTest cluster + plugin(provider({ new RegularFile() { @Override File getAsFile() { return configurations.zipArchive.asFileTree.matching { include '**/opensearch-notifications-core*' }.singleFile } } })) + plugin(provider({ new RegularFile() { @Override File getAsFile() { return configurations.zipArchive.asFileTree.matching { include '**/notifications*' }.singleFile } } })) + plugin(provider({ new RegularFile() { @Override File getAsFile() { return configurations.zipArchive.asFileTree.matching { include '**/opensearch-job-scheduler*' }.singleFile } } })) + plugin(provider({ new RegularFile() { @Override File getAsFile() { return configurations.zipArchive.asFileTree.matching { include '**/opensearch-sql-plugin*' }.singleFile } } })) + + } + + // Install alerting plugin on the pluggable dataformat test cluster + integTestPluggableDataformat.dependsOn(bundle) + integTestPluggableDataformat.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))} + + // Exclude pluggable dataformat tests from the main integTest task + integTest { + filter { + excludeTestsMatching "org.opensearch.alerting.transport.PluggableDataFormatMonitorBlockIT" + } + } + + check.dependsOn integTestPluggableDataformat + integTest.finalizedBy integTestPluggableDataformat } task integTestRemote(type: RestIntegTestTask) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 341cd9dff..a645ed92d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -27,6 +27,7 @@ import org.opensearch.alerting.util.use import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings +import org.opensearch.common.util.FeatureFlags import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType @@ -35,6 +36,7 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.commons.alerting.util.isMonitorOfStandardType +import org.opensearch.commons.alerting.util.isPPLMonitor import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.TenantContext import org.opensearch.core.action.ActionListener @@ -207,6 +209,22 @@ class TransportExecuteMonitorAction @Inject constructor( return@use } + // Block non-PPL monitors on pluggable dataformat domains + if (FeatureFlags.isEnabled(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG) && + !monitor.isPPLMonitor() + ) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Only PPL monitors are supported on this domain." + + " ${monitor.monitorType} monitors are not allowed.", + RestStatus.FORBIDDEN + ) + ) + ) + return@use + } + if ( monitor.isMonitorOfStandardType() && Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index c2d8f26a4..379c37456 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -62,6 +62,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.common.util.FeatureFlags import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentHelper @@ -85,6 +86,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonit import org.opensearch.commons.alerting.model.userErrorMessage import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.commons.alerting.util.isMonitorOfStandardType +import org.opensearch.commons.alerting.util.isPPLMonitor import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.TenantContext import org.opensearch.commons.utils.recreateObject @@ -213,6 +215,22 @@ class TransportIndexMonitorAction @Inject constructor( return } + // Block non-PPL monitors on pluggable dataformat domains + if (FeatureFlags.isEnabled(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG) && + !transformedRequest.monitor.isPPLMonitor() + ) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Only PPL monitors are supported on this domain." + + " ${transformedRequest.monitor.monitorType} monitors are not allowed.", + RestStatus.FORBIDDEN + ) + ) + ) + return + } + val user = readUserFromThreadContext(client) if (!validateUserBackendRoles(user, actionListener)) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index fe4afd0fc..e7d3a08cb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -51,6 +51,7 @@ import org.opensearch.alerting.workflow.CompositeWorkflowRunner import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings +import org.opensearch.common.util.FeatureFlags import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.common.xcontent.XContentHelper @@ -150,6 +151,22 @@ class TransportIndexWorkflowAction @Inject constructor( return } + // Block workflow creation on pluggable dataformat domains + if (FeatureFlags.isEnabled( + FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG + ) + ) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Workflow operations are not supported on this domain.", + RestStatus.FORBIDDEN + ) + ) + ) + return + } + val transformedRequest = request as? IndexWorkflowRequest ?: recreateObject(request, namedWriteableRegistry) { IndexWorkflowRequest(it) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/PluggableDataFormatMonitorBlockIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/PluggableDataFormatMonitorBlockIT.kt new file mode 100644 index 000000000..7ad2f22c9 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/PluggableDataFormatMonitorBlockIT.kt @@ -0,0 +1,319 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.randomWorkflow +import org.opensearch.client.ResponseException +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IntervalSchedule +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.PPLInput +import org.opensearch.commons.alerting.model.PPLTrigger +import org.opensearch.commons.alerting.model.QueryLevelTrigger +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.core.rest.RestStatus +import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script +import org.opensearch.search.aggregations.AggregationBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import java.time.Instant +import java.time.temporal.ChronoUnit + +class PluggableDataFormatMonitorBlockIT : AlertingRestTestCase() { + + fun `test create doc-level monitor blocked on pluggable dataformat domain`() { + val testIndex = createTestIndex() + val monitor = Monitor( + name = "test-doc-monitor", + monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, + enabled = false, + schedule = IntervalSchedule(5, ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + user = null, + inputs = listOf( + DocLevelMonitorInput( + "desc", + listOf(testIndex), + listOf(DocLevelQuery("query1", "query1", listOf(), """{"match_all": {}}""")) + ) + ), + triggers = emptyList(), + uiMetadata = mapOf() + ) + + try { + createMonitor(monitor) + fail("Expected monitor creation to be blocked on pluggable dataformat domain") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + val message = e.message ?: "" + assertTrue( + "Expected FORBIDDEN error for non-PPL monitor on pluggable dataformat domain, got: $message", + message.contains("Only PPL monitors are supported") || message.contains("monitors are not allowed") + ) + } + } + + fun `test create PPL monitor not blocked by pluggable dataformat gate`() { + val testIndex = createTestIndex() + val monitor = Monitor( + name = "test-ppl-monitor", + monitorType = Monitor.MonitorType.PPL_MONITOR.value, + enabled = false, + schedule = IntervalSchedule(5, ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + user = null, + inputs = listOf( + PPLInput( + query = "source = $testIndex | head 10", + queryLanguage = PPLInput.QueryLanguage.PPL + ) + ), + triggers = listOf( + PPLTrigger( + name = "test-trigger", + severity = "1", + actions = emptyList(), + conditionType = PPLTrigger.ConditionType.NUMBER_OF_RESULTS, + numResultsCondition = PPLTrigger.NumResultsCondition.GREATER_THAN, + numResultsValue = 0L, + customCondition = null + ) + ), + uiMetadata = mapOf() + ) + + try { + createMonitor(monitor) + } catch (e: Exception) { + // PPL validation requires the SQL plugin which is not loaded in this test. + // The important assertion is that the error is NOT from the pluggable dataformat gate. + val message = e.message ?: "" + assertFalse( + "PPL monitor should not be blocked by pluggable dataformat gate, got: $message", + message.contains("Only PPL monitors are supported") || message.contains("monitors are not allowed") + ) + } + } + + fun `test update query-level monitor blocked on pluggable dataformat domain`() { + val testIndex = createTestIndex() + val monitor = Monitor( + name = "test-query-monitor", + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, + enabled = false, + schedule = IntervalSchedule(5, ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + user = null, + inputs = listOf( + SearchInput( + listOf(testIndex), + SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + ) + ), + triggers = listOf( + QueryLevelTrigger( + name = "trigger", + severity = "1", + condition = Script("return true"), + actions = emptyList() + ) + ), + uiMetadata = mapOf() + ) + + try { + updateMonitor(monitor.copy(id = "some-monitor-id")) + fail("Expected monitor update to be blocked on pluggable dataformat domain") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + val message = e.message ?: "" + assertTrue( + "Expected FORBIDDEN error for non-PPL monitor update on pluggable dataformat domain, got: $message", + message.contains("Only PPL monitors are supported") || message.contains("monitors are not allowed") + ) + } + } + + fun `test execute query-level monitor blocked on pluggable dataformat domain`() { + val testIndex = createTestIndex() + val monitor = Monitor( + name = "test-query-monitor", + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, + enabled = false, + schedule = IntervalSchedule(5, ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + user = null, + inputs = listOf( + SearchInput( + listOf(testIndex), + SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + ) + ), + triggers = listOf( + QueryLevelTrigger( + name = "trigger", + severity = "1", + condition = Script("return true"), + actions = emptyList() + ) + ), + uiMetadata = mapOf() + ) + + try { + executeMonitor(monitor, mapOf("dryrun" to "true")) + fail("Expected monitor execution to be blocked on pluggable dataformat domain") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + val message = e.message ?: "" + assertTrue( + "Expected FORBIDDEN error for non-PPL monitor execution on pluggable dataformat domain, got: $message", + message.contains("Only PPL monitors are supported") || message.contains("monitors are not allowed") + ) + } + } + + fun `test execute PPL monitor not blocked by pluggable dataformat gate`() { + val testIndex = createTestIndex() + val monitor = Monitor( + name = "test-ppl-monitor", + monitorType = Monitor.MonitorType.PPL_MONITOR.value, + enabled = false, + schedule = IntervalSchedule(5, ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + user = null, + inputs = listOf( + PPLInput( + query = "source = $testIndex | head 10", + queryLanguage = PPLInput.QueryLanguage.PPL + ) + ), + triggers = listOf( + PPLTrigger( + name = "test-trigger", + severity = "1", + actions = emptyList(), + conditionType = PPLTrigger.ConditionType.NUMBER_OF_RESULTS, + numResultsCondition = PPLTrigger.NumResultsCondition.GREATER_THAN, + numResultsValue = 0L, + customCondition = null + ) + ), + uiMetadata = mapOf() + ) + + try { + executeMonitor(monitor, mapOf("dryrun" to "true")) + } catch (e: Exception) { + // PPL execution requires the SQL plugin which is not loaded in this test. + // The important assertion is that the error is NOT from the pluggable dataformat gate. + val message = e.message ?: "" + assertFalse( + "PPL monitor execution should not be blocked by pluggable dataformat gate, got: $message", + message.contains("Only PPL monitors are supported") || message.contains("monitors are not allowed") + ) + } + } + + fun `test create bucket-level monitor blocked on pluggable dataformat domain`() { + val testIndex = createTestIndex() + val monitor = Monitor( + name = "test-bucket-monitor", + monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, + enabled = false, + schedule = IntervalSchedule(5, ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + user = null, + inputs = listOf( + SearchInput( + listOf(testIndex), + SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .aggregation( + AggregationBuilders.terms("test_agg").field("test_field_1") + ) + ) + ), + triggers = emptyList(), + uiMetadata = mapOf() + ) + + try { + createMonitor(monitor) + fail("Expected monitor creation to be blocked on pluggable dataformat domain") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + val message = e.message ?: "" + assertTrue( + "Expected FORBIDDEN error for non-PPL monitor, got: $message", + message.contains("Only PPL monitors are supported") || message.contains("monitors are not allowed") + ) + } + } + + fun `test create cluster-metrics monitor blocked on pluggable dataformat domain`() { + val testIndex = createTestIndex() + val monitor = Monitor( + name = "test-cluster-metrics-monitor", + monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR.value, + enabled = false, + schedule = IntervalSchedule(5, ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + user = null, + inputs = listOf( + SearchInput( + listOf(testIndex), + SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + ) + ), + triggers = listOf( + QueryLevelTrigger( + name = "trigger", + severity = "1", + condition = Script("return true"), + actions = emptyList() + ) + ), + uiMetadata = mapOf() + ) + + try { + createMonitor(monitor) + fail("Expected monitor creation to be blocked on pluggable dataformat domain") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + val message = e.message ?: "" + assertTrue( + "Expected FORBIDDEN error for non-PPL monitor, got: $message", + message.contains("Only PPL monitors are supported") || message.contains("monitors are not allowed") + ) + } + } + + fun `test create workflow blocked on pluggable dataformat domain`() { + try { + createWorkflow(randomWorkflow(monitorIds = listOf("dummy-id"))) + fail("Expected workflow creation to be blocked on pluggable dataformat domain") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + val message = e.message ?: "" + assertTrue( + "Expected FORBIDDEN error for workflow on pluggable dataformat domain, got: $message", + message.contains("Workflow operations are not supported") || message.contains("not supported on this domain") + ) + } + } +}