Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions src/robusta/model/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
from robusta.core.pubsub.event_emitter import EventEmitter
from robusta.core.pubsub.event_subscriber import EventHandler
from robusta.core.pubsub.events_pubsub import EventsPubSub
from robusta.core.sinks.robusta.robusta_sink_params import RobustaSinkConfigWrapper, RobustaSinkParams
from robusta.core.sinks.robusta.robusta_sink_params import (
RobustaSinkConfigWrapper,
RobustaSinkParams,
)
from robusta.core.sinks.sink_base import SinkBase
from robusta.core.sinks.sink_config import SinkConfigBase
from robusta.core.sinks.sink_factory import SinkFactory
from robusta.integrations.receiver import ActionRequestReceiver
from robusta.integrations.scheduled.playbook_scheduler_manager import PlaybooksSchedulerManager
from robusta.integrations.scheduled.playbook_scheduler_manager import (
PlaybooksSchedulerManager,
)
from robusta.model.alert_relabel_config import AlertRelabel
from robusta.model.playbook_definition import PlaybookDefinition
from robusta.runner.telemetry import Telemetry
Expand All @@ -26,8 +31,14 @@ def __init__(self, sinks: Dict[str, SinkBase]):
self.sinks = sinks
self.default_sinks = [sink.sink_name for sink in sinks.values() if sink.default]
if not self.default_sinks:
logging.warning("No default sinks defined. By default, actions results are ignored.")
platform_sinks = [sink for sink in sinks.values() if isinstance(sink.params, RobustaSinkParams)]
logging.warning(
"No default sinks defined. By default, actions results are ignored."
)
platform_sinks = [
sink
for sink in sinks.values()
if isinstance(sink.params, RobustaSinkParams)
]
self.platform_enabled = len(platform_sinks) > 0

def get_sink_by_name(self, sink_name: str) -> Optional[SinkBase]:
Expand All @@ -45,16 +56,24 @@ def construct_new_sinks(
) -> Dict[str, SinkBase]:
new_sink_names = [sink_config.get_name() for sink_config in new_sinks_config]
# remove deleted sinks
deleted_sink_names = [sink_name for sink_name in existing_sinks.keys() if sink_name not in new_sink_names]
deleted_sink_names = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RoiGlinik I don't think you can change the order of the sinks
the order of the sinks matters

At least 2 places that I can think of:

  1. using stop on a sink - to the finding not continue to others
  2. Taking a value from one sink, and adding it to others (for example, taking the alert id from OpsGenie, and putting it in a Slack sink.

Please try to think of a different solution to solve this

sink_name
for sink_name in existing_sinks.keys()
if sink_name not in new_sink_names
]
for deleted_sink in deleted_sink_names:
logging.info(f"Deleting sink {deleted_sink}")
existing_sinks[deleted_sink].stop()
del existing_sinks[deleted_sink]

new_sinks: Dict[str, SinkBase] = dict()

# Reload sinks, order does matter and should be loaded & added to the dict by config order.
for sink_config in new_sinks_config:

# Reload sinks, order does matter. Load Robusta sink last because it send status to database.
# In case of multiple sinks and a sink config issue runner crashes and we don't want status to be sent until all sinks are created.
new_sinks_config_sorted = sorted(
new_sinks_config, key=lambda x: isinstance(x, RobustaSinkConfigWrapper)
)
for sink_config in new_sinks_config_sorted:
# temporary workaround to skip the default and unconfigured robusta token
if (
isinstance(sink_config, RobustaSinkConfigWrapper)
Expand All @@ -70,10 +89,19 @@ def construct_new_sinks(
continue

is_global_config_changed = exists_sink.is_global_config_changed()
is_sink_changed = sink_config.get_params() != exists_sink.params or is_global_config_changed
is_sink_changed = (
sink_config.get_params() != exists_sink.params
or is_global_config_changed
)
if is_sink_changed:
config_change_msg = "due to global config change" if is_global_config_changed else "due to param change"
logging.info(f"Updating {type(sink_config)} sink named {sink_config.get_name()} {config_change_msg}")
config_change_msg = (
"due to global config change"
if is_global_config_changed
else "due to param change"
)
logging.info(
f"Updating {type(sink_config)} sink named {sink_config.get_name()} {config_change_msg}"
)
exists_sink.stop()
new_sinks[sink_name] = SinkFactory.create_sink(sink_config, registry)
continue
Expand Down Expand Up @@ -120,10 +148,14 @@ def __init__(
raise Exception(msg)
action.set_func_hash(get_function_hash(action_def.func))
if action_def.params_type: # action has params
action.action_params = merge_global_params(global_config, action.action_params)
action.action_params = merge_global_params(
global_config, action.action_params
)
if getattr(action_def.params_type, "pre_deploy_func", None):
for trigger in playbook_def.triggers:
action_params = action_def.params_type(**action.action_params)
action_params = action_def.params_type(
**action.action_params
)
action_params.pre_deploy_func(trigger.get())

# validate that the action can be triggered by all playbooks triggers
Expand All @@ -138,7 +170,10 @@ def __init__(

# add the playbook only once for each event.
playbooks_trigger_events = set(
[trigger_definition.get().get_trigger_event() for trigger_definition in playbook_def.triggers]
[
trigger_definition.get().get_trigger_event()
for trigger_definition in playbook_def.triggers
]
)
for event in playbooks_trigger_events:
self.triggers_to_playbooks[event].append(playbook_def)
Expand Down
Loading