Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.PPLInput
import org.opensearch.commons.alerting.model.PPLTrigger
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
Expand Down Expand Up @@ -223,6 +225,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

override fun getActions(): List<ActionPlugin.ActionHandler<out ActionRequest, out ActionResponse>> {
return listOf(
// Alerting V1
ActionPlugin.ActionHandler(ScheduledJobsStatsAction.INSTANCE, ScheduledJobsStatsTransportAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_MONITOR_ACTION_TYPE, TransportIndexMonitorAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_MONITOR_ACTION_TYPE, TransportGetMonitorAction::class.java),
Expand Down Expand Up @@ -258,12 +261,14 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
PPLInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY,
PPLTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}
Expand Down Expand Up @@ -432,7 +437,13 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.COMMENTS_MAX_CONTENT_SIZE,
AlertingSettings.MAX_COMMENTS_PER_ALERT,
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION,
AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES
AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES,
AlertingSettings.PPL_MONITOR_EXECUTION_MAX_DURATION,
AlertingSettings.PPL_MAX_QUERY_LENGTH,
AlertingSettings.PPL_QUERY_RESULTS_MAX_DATAROWS,
AlertingSettings.PPL_QUERY_RESULTS_MAX_SIZE,
AlertingSettings.NOTIFICATION_SUBJECT_SOURCE_MAX_LENGTH,
AlertingSettings.NOTIFICATION_MESSAGE_SOURCE_MAX_LENGTH
)
}

Expand Down
116 changes: 1 addition & 115 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,19 @@

package org.opensearch.alerting

import org.opensearch.OpenSearchSecurityException
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetDestinationsRequest
import org.opensearch.alerting.action.GetDestinationsResponse
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs
import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.getNotificationConfigInfo
import org.opensearch.alerting.util.destinationmigration.getTitle
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
import org.opensearch.alerting.util.destinationmigration.sendNotification
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isTestAction
import org.opensearch.alerting.util.getConfigAndSendNotification
import org.opensearch.alerting.util.use
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.Table
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.core.common.Strings
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.node.NodeClient
import java.time.Instant

abstract class MonitorRunner {
Expand Down Expand Up @@ -93,103 +78,4 @@ abstract class MonitorRunner {
ActionRunResult(action.id, action.name, mapOf(), false, MonitorRunnerService.currentTime(), e)
}
}

protected suspend fun getConfigAndSendNotification(
action: Action,
monitorCtx: MonitorRunnerExecutionContext,
subject: String?,
message: String
): String {
val config = getConfigForNotificationAction(action, monitorCtx)
if (config.destination == null && config.channel == null) {
throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.destinationId}]")
}

// Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type
// just for Alerting integration tests
if (config.destination?.isTestAction() == true) {
return "test action"
}

if (config.destination?.isAllowed(monitorCtx.allowList) == false) {
throw IllegalStateException(
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
)
}

var actionResponseContent = ""
actionResponseContent = config.channel
?.sendNotification(
monitorCtx.client!!,
config.channel.getTitle(subject),
message
) ?: actionResponseContent

actionResponseContent = config.destination
?.buildLegacyBaseMessage(subject, message, monitorCtx.destinationContextFactory!!.getDestinationContext(config.destination))
?.publishLegacyNotification(monitorCtx.client!!)
?: actionResponseContent

return actionResponseContent
}

/**
* The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config
* depending on whether the background migration process has already migrated it from a Destination to a Notification config.
*
* To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved.
*/
private suspend fun getConfigForNotificationAction(
action: Action,
monitorCtx: MonitorRunnerExecutionContext
): NotificationActionConfigs {
var destination: Destination? = null
var notificationPermissionException: Exception? = null

var channel: NotificationConfigInfo? = null
try {
channel = getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId)
} catch (e: OpenSearchSecurityException) {
notificationPermissionException = e
}

// If the channel was not found, try to retrieve the Destination
if (channel == null) {
destination = try {
val table = Table(
"asc",
"destination.name.keyword",
null,
1,
0,
null
)
val getDestinationsRequest = GetDestinationsRequest(
action.destinationId,
0L,
null,
table,
"ALL"
)

val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
}
getDestinationsResponse.destinations.firstOrNull()
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
} catch (e: OpenSearchSecurityException) {
if (notificationPermissionException != null)
throw notificationPermissionException
else
throw e
}

if (destination == null && notificationPermissionException != null)
throw notificationPermissionException
}

return NotificationActionConfigs(destination, channel)
}
}
40 changes: 39 additions & 1 deletion alerting/src/main/kotlin/org/opensearch/alerting/PPLUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode
import org.json.JSONObject
import org.opensearch.alerting.core.ppl.PPLPluginInterface
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.sql.plugin.transport.PPLQueryAction
import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse
import org.opensearch.transport.client.node.NodeClient

object PPLUtils {
Expand Down Expand Up @@ -101,7 +106,7 @@ object PPLUtils {
"/_plugins/_ppl"
}

// call PPL plugin to execute query
// prepare request to SQL/PPL Plugin
val transportPplQueryRequest = TransportPPLQueryRequest(
query,
JSONObject(mapOf("query" to query)),
Expand All @@ -119,6 +124,39 @@ object PPLUtils {
return mapper.readTree(transportPplQueryResponse.result)
}

fun executePplQuery(
query: String,
explain: Boolean,
client: NodeClient,
listener: ActionListener<TransportPPLQueryResponse>
) {
val path = if (explain) {
"/_plugins/_ppl/_explain"
} else {
"/_plugins/_ppl"
}

// prepare request to SQL/PPL Plugin
val request = TransportPPLQueryRequest(
query,
JSONObject(mapOf("query" to query)),
path
)

val wrappedListener = object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = recreateObject(response) { TransportPPLQueryResponse(it) }
listener.onResponse(recreated)
}

override fun onFailure(exception: Exception) {
listener.onFailure(exception)
}
} as ActionListener<TransportPPLQueryResponse>

client.execute(PPLQueryAction.INSTANCE, request, wrappedListener)
}

fun capAndReformatPPLQueryResults(rawQueryResults: JsonNode, maxSize: Long): List<Map<String, Any?>> {
val cappedQueryResults = capPPLQueryResultsSize(rawQueryResults, maxSize)
val cappedMap = mapper.convertValue(cappedQueryResults, Map::class.java) as Map<String, Any>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.model.Table
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.RestHandler.ReplacedRoute
import org.opensearch.rest.RestHandler.Route
import org.opensearch.rest.RestRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.PPLTrigger
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.util.AlertingException
Expand All @@ -43,7 +44,7 @@ import org.opensearch.rest.action.RestResponseListener
import org.opensearch.transport.client.node.NodeClient
import java.io.IOException
import java.time.Instant
import java.util.*
import java.util.Locale

private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)

Expand Down Expand Up @@ -135,6 +136,14 @@ class RestIndexMonitorAction : BaseRestHandler() {
}
}
}

Monitor.MonitorType.PPL_MONITOR -> {
triggers.forEach {
if (it !is PPLTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for PPL monitor")
}
}
}
}
}
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.Alert.Companion.MONITOR_NAME_FIELD
import org.opensearch.commons.alerting.model.Alert.Companion.TRIGGER_NAME_FIELD
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.util.AlertingException
Expand Down Expand Up @@ -135,8 +137,8 @@ class TransportGetAlertsAction @Inject constructor(
QueryBuilders
.queryStringQuery(tableProp.searchString)
.defaultOperator(Operator.AND)
.field("monitor_name")
.field("trigger_name")
.field(MONITOR_NAME_FIELD)
.field(TRIGGER_NAME_FIELD)
)
}
val searchSourceBuilder = SearchSourceBuilder()
Expand Down
Loading