Skip to content
Merged
3,132 changes: 3,132 additions & 0 deletions run_conf1.data.xml

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions src/drunc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from functools import wraps
from typing import Callable, List, TypeVar

from daqpytools.logging.handlers import LogHandlerConf
from druncschema.authoriser_pb2 import ActionType, SystemType
from druncschema.broadcast_pb2 import BroadcastType
from druncschema.controller_pb2 import (
Expand All @@ -33,6 +34,7 @@
from druncschema.controller_pb2_grpc import ControllerServicer
from druncschema.description_pb2 import Description
from druncschema.generic_pb2 import PlainText, Stacktrace
from druncschema.opmon.FSM_pb2 import FSMStatus
from druncschema.opmon.generic_pb2 import RunInfo
from druncschema.request_response_pb2 import Response, ResponseFlag
from druncschema.token_pb2 import Token
Expand Down Expand Up @@ -219,14 +221,17 @@ class Controller(ControllerServicer):
children_nodes: List[ChildNode] = []

def __init__(self, configuration, name: str, session: str, token: Token):
"""C'tor. Note that controllers require the ERS variables defined
in OKS to exist as env variables!"""
super().__init__()

self._previous_error_state = False
self.name = name
self.session = session
self.broadcast_service = None
self.monitoring_metrics = ControllerMonitoringMetrics()

self.log = get_logger("controller.core", stream_handlers=True)
self.handlerconf = LogHandlerConf(init_ers=True)
self.log = get_logger(f"controller.core.{name}_ctrl", ers_kafka_handler=True)
log_init = get_logger("controller.core.__init__")
log_init.info(f"Initialising controller '{name}' with session '{session}'")

Expand Down Expand Up @@ -392,6 +397,16 @@ def interrupt_with_exception(self, *args, **kwargs):
return self.broadcast_service._interrupt_with_exception(*args, **kwargs)

def controller_publisher(self, message, custom_origin: dict | None = None):
if isinstance(message, FSMStatus) and message.in_error:
if message.in_error and not self._previous_error_state:
self.log.error(
f"{self.name} is now in an error state", extra=self.handlerconf.ERS
)
elif not message.in_error and self._previous_error_state:
self.log.info(
f"{self.name} is now in a good state", extra=self.handlerconf.ERS
)
self._previous_error_state = message.in_error
if self.opmon_publisher is not None:
try:
if custom_origin is None:
Expand Down Expand Up @@ -989,7 +1004,10 @@ def execute_fsm_command(

# Check if node is in error.
if self.stateful_node.node_is_in_error():
self.log.error(f"Command '{command_name}' not executed: node is in error.")
self.log.error(
f"Command '{command_name}' not executed: node is in error.",
extra=self.handlerconf.ERS,
)
response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR
return response

Expand Down
6 changes: 5 additions & 1 deletion src/drunc/data/process_manager/k8s-CERN.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
"publish_timeout": 2
},
"environment": {
"GRPC_ENABLE_FORK_SUPPORT": "false"
"GRPC_ENABLE_FORK_SUPPORT": "false",
"DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)"
},
"opmon_uri": {
"path": "monkafka.cern.ch:30092/opmon_stream",
Expand Down
6 changes: 5 additions & 1 deletion src/drunc/data/process_manager/k8s.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
"publish_timeout": 2
},
"environment": {
"GRPC_ENABLE_FORK_SUPPORT": "false"
"GRPC_ENABLE_FORK_SUPPORT": "false",
"DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout",
"DUNEDAQ_ERS_FATAL": "erstrace,lstdout",
"DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout",
"DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout"
},
"opmon_uri": {
"path": "./info.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"publish_timeout": 2
},
"environment": {
"GRPC_ENABLE_FORK_SUPPORT": "false"
"GRPC_ENABLE_FORK_SUPPORT": "false",
"DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)"
}
}
6 changes: 5 additions & 1 deletion src/drunc/data/process_manager/ssh-CERN-kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
"publish_timeout": 2
},
"environment": {
"GRPC_ENABLE_FORK_SUPPORT": "false"
"GRPC_ENABLE_FORK_SUPPORT": "false",
"DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)"
},
"opmon_uri": {
"path": "monkafka.cern.ch:30092/opmon_stream",
Expand Down
6 changes: 5 additions & 1 deletion src/drunc/data/process_manager/ssh-pocket-kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
},

"environment": {
"GRPC_ENABLE_FORK_SUPPORT": "false"
"GRPC_ENABLE_FORK_SUPPORT": "false",
"DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
"type": "dummy"
},
"environment": {
"GRPC_ENABLE_FORK_SUPPORT": "false"
"GRPC_ENABLE_FORK_SUPPORT": "false",
"DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)",
"DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)"
},
"opmon_uri": {
"path": "./info.json",
Expand Down
6 changes: 5 additions & 1 deletion src/drunc/data/process_manager/ssh-standalone.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
"type": "dummy"
},
"environment": {
"GRPC_ENABLE_FORK_SUPPORT": "false"
"GRPC_ENABLE_FORK_SUPPORT": "false",
"DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout",
"DUNEDAQ_ERS_FATAL": "erstrace,lstdout",
"DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout",
"DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout"
},
"opmon_uri": {
"path": "./info.json",
Expand Down
138 changes: 132 additions & 6 deletions src/drunc/process_manager/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time

from daqpytools.logging.handlers import HandlerType, LogHandlerConf
from druncschema.authoriser_pb2 import ActionType, SystemType
from druncschema.broadcast_pb2 import BroadcastType
from druncschema.description_pb2 import CommandDescription, Description
Expand Down Expand Up @@ -54,9 +55,13 @@ def __init__(
session: str = None,
**kwargs,
):
"""C'tor. Note that this takes the ERS env variables from the
json files defined in data/process_manager!"""
super().__init__()
self.handlerconf = LogHandlerConf(init_ers=True)
self.log = get_logger(
f"process_manager.{configuration.get_data_type_name()}_process_manager"
f"process_manager.{configuration.get_data_type_name()}_process_manager",
ers_kafka_handler=True,
)
self.log.debug(pid_info_str())
self.log.debug("Initialized ProcessManager")
Expand All @@ -77,8 +82,12 @@ def __init__(
interval_s = getattr(self.configuration.get_data(), "interval_s", 10.0)
self.authoriser = DummyAuthoriser(dach, SystemType.PROCESS_MANAGER)

self.process_store = {} # dict[str, sh.RunningCommand]
self.boot_request = {} # dict[str, BootRequest]
self.process_store = {} # dict[str, sh.RunningCommand] # str = uuid
self.boot_request = {} # dict[str, BootRequest] # str = uuid

# Define a list of applications that we expect to die, and a lock to read the memory
self.dead_process_lock = threading.Lock()
self.expected_dead_applications = {} # dict[str, BootRequest] # str == uuid

# TODO, probably need to think of a better way to do this?
# Maybe I should "bind" the commands to their methods, and have something looping over this list to generate the gRPC functions
Expand Down Expand Up @@ -169,6 +178,15 @@ def __del__(self):
self.thread.join()

def publish(self, q: ProcessQuery, interval_s: float = 10.0):
def find_by_uuid(pi_list, target_uuid: str):
"""Identifies the process from a list by uuid"""
for pi in pi_list.values:
if pi.uuid.uuid == target_uuid:
return pi
return None

n_dead_prev = 0
dead_processes_prev = set()
while not self.stop_event.is_set():
results = self._ps_impl(q)

Expand All @@ -177,11 +195,12 @@ def publish(self, q: ProcessQuery, interval_s: float = 10.0):
for process in results.values
if process.status_code == ProcessInstance.StatusCode.RUNNING
)
n_dead = sum(
1
dead_processes = {
process.uuid.uuid
for process in results.values
if process.status_code == ProcessInstance.StatusCode.DEAD
)
}
n_dead = len(dead_processes)
n_session = len(
{
process.process_description.metadata.session
Expand All @@ -193,6 +212,21 @@ def publish(self, q: ProcessQuery, interval_s: float = 10.0):
n_running=n_running, n_dead=n_dead, n_session=n_session
),
)
if n_dead_prev < n_dead:
n_dead_prev = n_dead
diff_set = dead_processes - dead_processes_prev
for diff in diff_set:
if diff in self.expected_dead_applications:
self.log.debug(
f"Process {diff} already expected to be dead, continuing"
)
continue
pi = find_by_uuid(results, diff)
err_msg = f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}"
# easiest way to send one to Rich and ERS
self.log.critical(err_msg, extra={"handlers": [HandlerType.Rich]})
self.log.critical(err_msg, extra=self.handlerconf.ERS)

time.sleep(interval_s)

"""
Expand Down Expand Up @@ -275,7 +309,10 @@ def terminate(
self.log.debug(f"{self.name} running terminate")

try:
self.mark_all_processes_as_expected_dead()
response = self._terminate_impl()
# Remove the list of dead applications, they are expected to be dead.
self.clear_dead_processes()
except NotImplementedError:
return ProcessInstanceList(
name=self.name,
Expand Down Expand Up @@ -376,6 +413,7 @@ def flush(

ret = []
for uuid in self._get_process_uid(request):
# Some unknown process was found, assume it is dead and move on
if uuid not in self.boot_request:
pu = ProcessUUID(uuid=uuid)
pi = ProcessInstance(
Expand Down Expand Up @@ -403,6 +441,7 @@ def flush(
except Exception:
pass

# If a process is already dead, remove it from the process store
if not self.process_store[uuid].is_alive():
pi = ProcessInstance(
process_description=pd,
Expand All @@ -415,6 +454,10 @@ def flush(
return_code=return_code,
uuid=pu,
)
# If we know that this process has died intentionally, remove it from
# tracking
self.remove_process_from_expected_dead_processes(uuid)

del self.process_store[uuid]
ret += [pi]

Expand Down Expand Up @@ -588,6 +631,89 @@ def _match_processes_against_query(

return processes

def add_process_to_expected_dead_processes(self, uuid: str) -> None:
"""
Add the process to the list of processes that are expected to die. Needed as the
OpMon publisher publishes the state when a process dies unexpectedly, and these
processes require tracking.

Args:
uuid: str - process UUId to add to the dict

Returns:
None

Raises:
DruncException - if the process is not known about, this error gets raised
"""
with self.dead_process_lock:
if uuid in self.boot_request:
br = BootRequest()
br.CopyFrom(self.boot_request[uuid])
self.expected_dead_applications[uuid] = br
else:
err_msg = f"Unexpected process with UUID {uuid} requested to be added to the list of dead applications!"
self.log.error(err_msg)

def remove_process_from_expected_dead_processes(self, uuid: str) -> None:
"""
Remove the process to the list of processes that are expected to die. Needed as
the OpMon publisher publishes the state when a process dies unexpectedly, and
these processes require tracking.

Args:
uuid: str - process UUId to add to the dict

Returns:
None

Raises:
DruncException - if the process is not known about, this error gets raised
"""
with self.dead_process_lock:
if uuid in self.expected_dead_applications:
self.expected_dead_applications.pop(uuid, None)
else:
err_msg = f"Unexpected process with UUID {uuid} requested to be removed from the list of expected_dead_applications!"
self.log.error(err_msg)

def mark_all_processes_as_expected_dead(self) -> None:
"""
Remove all processes from the tracker of expected dead processes

Args:
None

Returns:
None

Raises:
None
"""
with self.dead_process_lock:
for proc_uuid in self.boot_request:
if proc_uuid in self.expected_dead_applications:
continue
self.expected_dead_applications[proc_uuid] = self.boot_request[
proc_uuid
]

def clear_dead_processes(self) -> None:
"""
Remove all processes from the tracker of expected dead processes

Args:
None

Returns:
None

Raises:
None
"""
with self.dead_process_lock:
self.expected_dead_applications.clear()

def _get_process_uid(
self,
query: ProcessQuery,
Expand Down
Loading
Loading