From b9bb6fb0b7b8f93d1a75a6b26b23e639cf07f6da Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 5 Nov 2025 11:44:21 +0100 Subject: [PATCH 01/11] Add ers_uri and parser for ers handler Massive code cleanup --- src/drunc/controller/controller.py | 52 ++++++++++- src/drunc/controller/controller_driver.py | 5 ++ .../data/process_manager/ssh-CERN-kafka.json | 5 ++ .../data/process_manager/ssh-standalone.json | 4 + src/drunc/process_manager/configuration.py | 2 + src/drunc/process_manager/process_manager.py | 86 +++++++++++++++++-- 6 files changed, 146 insertions(+), 8 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index d2cc2a509..ba0723dbb 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -1,3 +1,4 @@ +import os import multiprocessing import re import threading @@ -67,6 +68,8 @@ from drunc.utils.grpc_utils import UnpackingError, pack_to_any, unpack_any from drunc.utils.utils import get_logger +from daqpytools.logging.handlers import add_ers_kafka_handler, LogHandlerConf + T = TypeVar("T") @@ -221,15 +224,34 @@ class Controller(ControllerServicer): def __init__(self, configuration, name: str, session: str, token: Token): super().__init__() + # handlerconf will be here + # construct the handler conf + # invert + + # Define it as a control utilities that you call here in the controller class + + # nvm move it to utils/utils (which is where all the logging function is gonna happen anyway) + self.name = name self.session = session self.broadcast_service = None self.monitoring_metrics = ControllerMonitoringMetrics() + self.handlerconf = LogHandlerConf() + self.log = get_logger("controller.core", stream_handlers=True) + + add_ers_kafka_handler(self.log, True, "emir-test") + log_init = get_logger("controller.core.__init__") + test_loggess = get_logger("controller.heyo", stream_handlers=True) + test_loggess.error("Hello, world") log_init.info(f"Initialising controller '{name}' with session '{session}'") + test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_INFO')=}") + + #! This is the next big step, I need to figureo ut how to use another configuration.. + self.configuration = configuration self.top_segment_controller = ( self.configuration.db.get_dal( @@ -250,6 +272,8 @@ def __init__(self, configuration, name: str, session: str, token: Token): data=self.configuration.data.controller.broadcaster, ) + # And then its gonna go here im sure + self.broadcast_service = BroadcastSender( name=name, session=session, @@ -965,20 +989,35 @@ def execute_fsm_command( flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + newlog = get_logger("mytestlogger", rich_handler=True) + newlog.warning("Hello there") + + # use this as a partial to execute all the stateful commands + try: # Parse and validate target. request.target = self.parse_target_string(request.target) except ValueError: response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + + #! This is the flag that determines the error stares + # enums are in ResponseFlag return response + newlog.warning("Just finished a try") + command = request.command command_name = command.command_name - self.log.debug(f"FSM command: {command_name}") + + self.log.warning(f"FSM command: {command_name}") transition = self.stateful_node.get_fsm_transition(command_name) - self.log.debug(f"FSM transition: {transition}") + self.log.warning(f"FSM transition: {transition}") # Check controller readiness. + + #! This is the place wher ethe things get messaged on where it doesnt go as planned + + #! This area is wher eyou want to put the message in! if not self.stateful_node.get_ready_state(): self.log.error( f"Command '{command_name}' not executed: controller is not ready." @@ -990,6 +1029,12 @@ def execute_fsm_command( # Check if node is in error. if self.stateful_node.node_is_in_error(): self.log.error(f"Command '{command_name}' not executed: node is in error.") + + self.log.error("THIS SHOULD GO TO ERS!!!! yes this i can do...") + + # Wrap it up folks this is what we're after + self.log.critical("Test error for ERS", extra=self.handlerconf.ERS) + response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR return response @@ -1500,6 +1545,9 @@ def to_error( token, ) + #! We probably want this as well... + self.log.critical("This application has been put into error") + return Response( name=self.name, token=token, diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index 88ef86f3d..3e5947739 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -79,6 +79,11 @@ def __init__(self, address: str, token: Token): ("grpc.keepalive_time_ms", 60000) # pings the server every 60 seconds ] + self.log.error(f"address: {address}, token: {token}") + + #! SO HERE!!! is where we initialise the add ers protobuf handler. The question is now how do we get the correct config? + #! But once yoh ave this it should be fine + try: resolved_address = self._resolve_address_to_ipv4(address) except ValueError as e: diff --git a/src/drunc/data/process_manager/ssh-CERN-kafka.json b/src/drunc/data/process_manager/ssh-CERN-kafka.json index e53f89267..8c22a80d9 100644 --- a/src/drunc/data/process_manager/ssh-CERN-kafka.json +++ b/src/drunc/data/process_manager/ssh-CERN-kafka.json @@ -20,6 +20,11 @@ "path": "monkafka.cern.ch:30092/opmon_stream", "type": "stream" }, + "ers_uri": { + "path": "monkafka.cern.ch:30092", + "topic": "ers_stream", + "type": "stream" + }, "opmon_conf":{ "level": "debug", "interval_s" : 10.0 diff --git a/src/drunc/data/process_manager/ssh-standalone.json b/src/drunc/data/process_manager/ssh-standalone.json index 9de7c66b3..7aa9b720e 100644 --- a/src/drunc/data/process_manager/ssh-standalone.json +++ b/src/drunc/data/process_manager/ssh-standalone.json @@ -16,6 +16,10 @@ "path": "./info.json", "type": "file" }, + "ers_uri": { + "path": "./info.json", + "type": "file" + }, "opmon_conf": { "level": "info", "interval_s": 10.0 diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index a4ac057fd..46217cac1 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -36,6 +36,7 @@ def __init__(self): self.settings = {} self.opmon_uri = None self.opmon_publisher = None + self.ers_uri = None class ProcessManagerConfHandler(ConfHandler): @@ -55,6 +56,7 @@ def _parse_dict(self, data): ) new_data.environment = data.get("environment", {}) new_data.settings = data.get("settings", {}) + new_data.ers_uri = data.get("ers_uri") match data["type"].lower(): case "ssh": diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 1345efddb..df7a4e16c 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -3,6 +3,7 @@ import threading import time +from daqpytools.logging.handlers import add_ers_kafka_handler from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.description_pb2 import CommandDescription, Description @@ -23,6 +24,9 @@ Request, ResponseFlag, ) + +# import logging +from erskafka.ERSKafkaLogHandler import ERSKafkaLogHandler from google.rpc import code_pb2 from grpc import ServicerContext @@ -58,8 +62,37 @@ def __init__( self.log = get_logger( f"process_manager.{configuration.get_data_type_name()}_process_manager" ) + + #! This is here because we want to set up this specific instance + + try: + ers_uri = configuration.get_data().ers_uri + self.log.warning(ers_uri) + if ers_uri["type"] == "file": + self.log.warning("Placeholder for log handler") + # TODO Add the log handler? If we add it its gonna spam the file handler I think. We need to be careful + + else: + #! This is hacky! + self.log.warning("Checking if ERS handler exists") + if not any( + isinstance(h, ERSKafkaLogHandler) for h in self.log.handlers + ): + self.log.warning("Going to add ers handler") + + # TODO: Will need to figure out if we want to add this to the current handler or to the root handler.. + add_ers_kafka_handler( + self.log, + use_parent_handlers=True, + session_name="session_temporary", + topic=ers_uri["topic"], + address=ers_uri["path"], + ) + except: + self.log.error("Failed to initialise") + raise DruncCommandException("Failed to initialise") + self.log.debug(pid_info_str()) - self.log.debug("Initialized ProcessManager") self.configuration = configuration self.name = name @@ -169,6 +202,19 @@ def __del__(self): self.thread.join() def publish(self, q: ProcessQuery, interval_s: float = 10.0): + def find_by_uuid(pi_list, target_uuid: str): + for pi in pi_list.values: + if pi.uuid.uuid == target_uuid: + return pi + return None + + # pi = find_by_uuid(process_instance_list, "5") + + n_dead_keep = 0 + graveyard = set() # Set of UU1Ds + + # lambda # obtain dead ones + while not self.stop_event.is_set(): results = self._ps_impl(q) @@ -177,11 +223,14 @@ def publish(self, q: ProcessQuery, interval_s: float = 10.0): for process in results.values if process.status_code == ProcessInstance.StatusCode.RUNNING ) - n_dead = sum( - 1 + + dead_processes = { + process.uuid.uuid for process in results.values if process.status_code == ProcessInstance.StatusCode.DEAD - ) + } + n_dead = len(dead_processes) + n_session = len( { process.process_description.metadata.session @@ -193,6 +242,30 @@ def publish(self, q: ProcessQuery, interval_s: float = 10.0): n_running=n_running, n_dead=n_dead, n_session=n_session ), ) + #! Obviously hacky but proof of principle + # Checks if n_dead changed compared to the previous iteration + # self.log.warning( + # f"Processes: {n_running}, {n_dead}, {n_session}, {n_dead_keep}" + # ) + + # Change this to increase + if n_dead_keep < n_dead: + # check which one has died + n_dead_keep = n_dead + # self.log.error("We have a real dead one!", extra={"use_ers": True}) + + print(graveyard) + print(dead_processes) + diff_set = dead_processes - graveyard + print(diff_set) + for diff in diff_set: + print(diff) + pi = find_by_uuid(results, diff) + # print(pi) + self.log.critical( + f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}" + ) + time.sleep(interval_s) """ @@ -200,7 +273,7 @@ def publish(self, q: ProcessQuery, interval_s: float = 10.0): """ def broadcast(self, *args, **kwargs): - self.log.debug(f"{self.name} broadcasting") + self.log.critical(f"{self.name} broadcasting") return ( self.broadcast_service.broadcast(*args, **kwargs) if self.broadcast_service @@ -409,7 +482,8 @@ def flush( process_restriction=pr, status_code=( ProcessInstance.StatusCode.RUNNING - if self.process_store[uuid].is_alive() + if self.process_store[uuid].is_alive() # this might be the key + # can you easily hook into the processinstancelist stored within processmanager to store in this query else ProcessInstance.StatusCode.DEAD ), return_code=return_code, From a52613c7527d1105d8b305db64b0246b8d3d1e32 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 10 Feb 2026 16:04:13 +0100 Subject: [PATCH 02/11] working prototype of everything --- src/drunc/controller/controller.py | 73 ++++++------------ src/drunc/controller/controller_driver.py | 5 -- src/drunc/process_manager/process_manager.py | 80 +++++--------------- 3 files changed, 43 insertions(+), 115 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index ba0723dbb..41629873b 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -8,6 +8,7 @@ from functools import wraps from typing import Callable, List, TypeVar +from druncschema.opmon.FSM_pb2 import FSMStatus from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.controller_pb2 import ( @@ -68,7 +69,7 @@ from drunc.utils.grpc_utils import UnpackingError, pack_to_any, unpack_any from drunc.utils.utils import get_logger -from daqpytools.logging.handlers import add_ers_kafka_handler, LogHandlerConf +from daqpytools.logging.handlers import LogHandlerConf T = TypeVar("T") @@ -224,33 +225,29 @@ class Controller(ControllerServicer): def __init__(self, configuration, name: str, session: str, token: Token): super().__init__() - # handlerconf will be here - # construct the handler conf - # invert - - # Define it as a control utilities that you call here in the controller class - - # nvm move it to utils/utils (which is where all the logging function is gonna happen anyway) - + self._previous_error_state = False self.name = name self.session = session self.broadcast_service = None self.monitoring_metrics = ControllerMonitoringMetrics() + self.handlerconf = LogHandlerConf(init_ers=True) - self.handlerconf = LogHandlerConf() - - self.log = get_logger("controller.core", stream_handlers=True) - - add_ers_kafka_handler(self.log, True, "emir-test") + #! I am now in favour of splitting controller.core here to + # "controller.core.{self.name}" or something similar + # so we have one kafka handler per controller + # its a lot of handlers but we probably want individual handlers per controller? + + self.log = get_logger(f"controller.core.{name}_ctrl", ers_kafka_handler=True) log_init = get_logger("controller.core.__init__") - test_loggess = get_logger("controller.heyo", stream_handlers=True) - test_loggess.error("Hello, world") log_init.info(f"Initialising controller '{name}' with session '{session}'") - + + #TODO: Delete before merge + test_loggess = get_logger("controller.heyo", stream_handlers=True) test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_INFO')=}") - - #! This is the next big step, I need to figureo ut how to use another configuration.. + test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_WARNING')=}") + test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_ERROR')=}") + test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_FATAL')=}") self.configuration = configuration self.top_segment_controller = ( @@ -272,8 +269,6 @@ def __init__(self, configuration, name: str, session: str, token: Token): data=self.configuration.data.controller.broadcaster, ) - # And then its gonna go here im sure - self.broadcast_service = BroadcastSender( name=name, session=session, @@ -416,6 +411,12 @@ def interrupt_with_exception(self, *args, **kwargs): return self.broadcast_service._interrupt_with_exception(*args, **kwargs) def controller_publisher(self, message, custom_origin: dict | None = None): + if isinstance(message, FSMStatus) and message.in_error: + if message.in_error and not self._previous_error_state: + self.log.error(f"{self.name} is now in an error state", extra=self.handlerconf.ERS) + elif not message.in_error and self._previous_error_state: + self.log.info(f"{self.name} is now in a good state",extra=self.handlerconf.ERS) + self._previous_error_state = message.in_error if self.opmon_publisher is not None: try: if custom_origin is None: @@ -989,35 +990,20 @@ def execute_fsm_command( flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) - newlog = get_logger("mytestlogger", rich_handler=True) - newlog.warning("Hello there") - - # use this as a partial to execute all the stateful commands - try: # Parse and validate target. request.target = self.parse_target_string(request.target) except ValueError: response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT - - #! This is the flag that determines the error stares - # enums are in ResponseFlag return response - newlog.warning("Just finished a try") - command = request.command command_name = command.command_name - - self.log.warning(f"FSM command: {command_name}") + self.log.debug(f"FSM command: {command_name}") transition = self.stateful_node.get_fsm_transition(command_name) - self.log.warning(f"FSM transition: {transition}") + self.log.debug(f"FSM transition: {transition}") # Check controller readiness. - - #! This is the place wher ethe things get messaged on where it doesnt go as planned - - #! This area is wher eyou want to put the message in! if not self.stateful_node.get_ready_state(): self.log.error( f"Command '{command_name}' not executed: controller is not ready." @@ -1028,13 +1014,7 @@ def execute_fsm_command( # Check if node is in error. if self.stateful_node.node_is_in_error(): - self.log.error(f"Command '{command_name}' not executed: node is in error.") - - self.log.error("THIS SHOULD GO TO ERS!!!! yes this i can do...") - - # Wrap it up folks this is what we're after - self.log.critical("Test error for ERS", extra=self.handlerconf.ERS) - + self.log.error(f"Command '{command_name}' not executed: node is in error.", extra=self.handlerconf.ERS) response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR return response @@ -1545,9 +1525,6 @@ def to_error( token, ) - #! We probably want this as well... - self.log.critical("This application has been put into error") - return Response( name=self.name, token=token, diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index 3e5947739..88ef86f3d 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -79,11 +79,6 @@ def __init__(self, address: str, token: Token): ("grpc.keepalive_time_ms", 60000) # pings the server every 60 seconds ] - self.log.error(f"address: {address}, token: {token}") - - #! SO HERE!!! is where we initialise the add ers protobuf handler. The question is now how do we get the correct config? - #! But once yoh ave this it should be fine - try: resolved_address = self._resolve_address_to_ipv4(address) except ValueError as e: diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index df7a4e16c..f99af8a79 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -2,8 +2,11 @@ import re import threading import time +import os + +from daqpytools.logging.handlers import LogHandlerConf, HandlerType + -from daqpytools.logging.handlers import add_ers_kafka_handler from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.description_pb2 import CommandDescription, Description @@ -24,9 +27,6 @@ Request, ResponseFlag, ) - -# import logging -from erskafka.ERSKafkaLogHandler import ERSKafkaLogHandler from google.rpc import code_pb2 from grpc import ServicerContext @@ -59,40 +59,14 @@ def __init__( **kwargs, ): super().__init__() + ers_uri = configuration.get_data().ers_uri + self.handlerconf = LogHandlerConf(init_ers=False) self.log = get_logger( - f"process_manager.{configuration.get_data_type_name()}_process_manager" + f"process_manager.{configuration.get_data_type_name()}_process_manager", ers_kafka_handler= ers_uri["type"] == "stream" ) - - #! This is here because we want to set up this specific instance - - try: - ers_uri = configuration.get_data().ers_uri - self.log.warning(ers_uri) - if ers_uri["type"] == "file": - self.log.warning("Placeholder for log handler") - # TODO Add the log handler? If we add it its gonna spam the file handler I think. We need to be careful - - else: - #! This is hacky! - self.log.warning("Checking if ERS handler exists") - if not any( - isinstance(h, ERSKafkaLogHandler) for h in self.log.handlers - ): - self.log.warning("Going to add ers handler") - - # TODO: Will need to figure out if we want to add this to the current handler or to the root handler.. - add_ers_kafka_handler( - self.log, - use_parent_handlers=True, - session_name="session_temporary", - topic=ers_uri["topic"], - address=ers_uri["path"], - ) - except: - self.log.error("Failed to initialise") - raise DruncCommandException("Failed to initialise") - self.log.debug(pid_info_str()) + self.log.debug("Initialized ProcessManager") + self.log.info(f'{ers_uri["type"] == "stream"}') # TODO: Delete before merge self.configuration = configuration self.name = name @@ -169,6 +143,9 @@ def __init__( self.broadcast(message="ready", btype=BroadcastType.SERVER_READY) + #! Envs dont exist! need to have a workaround + self.log.critical(f"{os.getenv('DUNEDAQ_ERS_ERROR')=}") + if self.opmon_publisher is not None: self.stop_event = threading.Event() self.thread = threading.Thread( @@ -207,14 +184,9 @@ def find_by_uuid(pi_list, target_uuid: str): if pi.uuid.uuid == target_uuid: return pi return None - - # pi = find_by_uuid(process_instance_list, "5") - + n_dead_keep = 0 - graveyard = set() # Set of UU1Ds - - # lambda # obtain dead ones - + graveyard = set() while not self.stop_event.is_set(): results = self._ps_impl(q) @@ -223,14 +195,12 @@ def find_by_uuid(pi_list, target_uuid: str): for process in results.values if process.status_code == ProcessInstance.StatusCode.RUNNING ) - dead_processes = { process.uuid.uuid for process in results.values if process.status_code == ProcessInstance.StatusCode.DEAD } n_dead = len(dead_processes) - n_session = len( { process.process_description.metadata.session @@ -242,29 +212,16 @@ def find_by_uuid(pi_list, target_uuid: str): n_running=n_running, n_dead=n_dead, n_session=n_session ), ) - #! Obviously hacky but proof of principle - # Checks if n_dead changed compared to the previous iteration - # self.log.warning( - # f"Processes: {n_running}, {n_dead}, {n_session}, {n_dead_keep}" - # ) - # Change this to increase if n_dead_keep < n_dead: - # check which one has died n_dead_keep = n_dead - # self.log.error("We have a real dead one!", extra={"use_ers": True}) - - print(graveyard) - print(dead_processes) diff_set = dead_processes - graveyard - print(diff_set) for diff in diff_set: - print(diff) pi = find_by_uuid(results, diff) - # print(pi) self.log.critical( - f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}" + f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}", extra={"handlers": [HandlerType.Rich, HandlerType.Protobufstream]} ) + #! No environment variables detected here time.sleep(interval_s) @@ -273,7 +230,7 @@ def find_by_uuid(pi_list, target_uuid: str): """ def broadcast(self, *args, **kwargs): - self.log.critical(f"{self.name} broadcasting") + self.log.debug(f"{self.name} broadcasting") return ( self.broadcast_service.broadcast(*args, **kwargs) if self.broadcast_service @@ -482,8 +439,7 @@ def flush( process_restriction=pr, status_code=( ProcessInstance.StatusCode.RUNNING - if self.process_store[uuid].is_alive() # this might be the key - # can you easily hook into the processinstancelist stored within processmanager to store in this query + if self.process_store[uuid].is_alive() else ProcessInstance.StatusCode.DEAD ), return_code=return_code, From 2cebe6d1d0c8d100aeae5fd260aba4fed12fd5ec Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 11 Feb 2026 11:10:10 +0100 Subject: [PATCH 03/11] Change how configuration is done with PM --- src/drunc/controller/controller.py | 33 +++++++------------ .../data/process_manager/ssh-CERN-kafka.json | 11 +++---- .../data/process_manager/ssh-standalone.json | 10 +++--- src/drunc/process_manager/configuration.py | 2 -- src/drunc/process_manager/process_manager.py | 25 +++++--------- 5 files changed, 30 insertions(+), 51 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 41629873b..5ad3089bf 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -1,4 +1,3 @@ -import os import multiprocessing import re import threading @@ -8,7 +7,7 @@ from functools import wraps from typing import Callable, List, TypeVar -from druncschema.opmon.FSM_pb2 import FSMStatus +from daqpytools.logging.handlers import LogHandlerConf from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.controller_pb2 import ( @@ -35,6 +34,7 @@ from druncschema.controller_pb2_grpc import ControllerServicer from druncschema.description_pb2 import Description from druncschema.generic_pb2 import PlainText, Stacktrace +from druncschema.opmon.FSM_pb2 import FSMStatus from druncschema.opmon.generic_pb2 import RunInfo from druncschema.request_response_pb2 import Response, ResponseFlag from druncschema.token_pb2 import Token @@ -69,8 +69,6 @@ from drunc.utils.grpc_utils import UnpackingError, pack_to_any, unpack_any from drunc.utils.utils import get_logger -from daqpytools.logging.handlers import LogHandlerConf - T = TypeVar("T") @@ -231,23 +229,9 @@ def __init__(self, configuration, name: str, session: str, token: Token): self.broadcast_service = None self.monitoring_metrics = ControllerMonitoringMetrics() self.handlerconf = LogHandlerConf(init_ers=True) - - - #! I am now in favour of splitting controller.core here to - # "controller.core.{self.name}" or something similar - # so we have one kafka handler per controller - # its a lot of handlers but we probably want individual handlers per controller? - self.log = get_logger(f"controller.core.{name}_ctrl", ers_kafka_handler=True) log_init = get_logger("controller.core.__init__") log_init.info(f"Initialising controller '{name}' with session '{session}'") - - #TODO: Delete before merge - test_loggess = get_logger("controller.heyo", stream_handlers=True) - test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_INFO')=}") - test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_WARNING')=}") - test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_ERROR')=}") - test_loggess.critical(f"{os.getenv('DUNEDAQ_ERS_FATAL')=}") self.configuration = configuration self.top_segment_controller = ( @@ -413,9 +397,13 @@ def interrupt_with_exception(self, *args, **kwargs): def controller_publisher(self, message, custom_origin: dict | None = None): if isinstance(message, FSMStatus) and message.in_error: if message.in_error and not self._previous_error_state: - self.log.error(f"{self.name} is now in an error state", extra=self.handlerconf.ERS) + self.log.error( + f"{self.name} is now in an error state", extra=self.handlerconf.ERS + ) elif not message.in_error and self._previous_error_state: - self.log.info(f"{self.name} is now in a good state",extra=self.handlerconf.ERS) + self.log.info( + f"{self.name} is now in a good state", extra=self.handlerconf.ERS + ) self._previous_error_state = message.in_error if self.opmon_publisher is not None: try: @@ -1014,7 +1002,10 @@ def execute_fsm_command( # Check if node is in error. if self.stateful_node.node_is_in_error(): - self.log.error(f"Command '{command_name}' not executed: node is in error.", extra=self.handlerconf.ERS) + self.log.error( + f"Command '{command_name}' not executed: node is in error.", + extra=self.handlerconf.ERS, + ) response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR return response diff --git a/src/drunc/data/process_manager/ssh-CERN-kafka.json b/src/drunc/data/process_manager/ssh-CERN-kafka.json index 8c22a80d9..bb31b477b 100644 --- a/src/drunc/data/process_manager/ssh-CERN-kafka.json +++ b/src/drunc/data/process_manager/ssh-CERN-kafka.json @@ -14,17 +14,16 @@ "publish_timeout": 2 }, "environment": { - "GRPC_ENABLE_FORK_SUPPORT": "false" + "GRPC_ENABLE_FORK_SUPPORT": "false", + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)" }, "opmon_uri": { "path": "monkafka.cern.ch:30092/opmon_stream", "type": "stream" }, - "ers_uri": { - "path": "monkafka.cern.ch:30092", - "topic": "ers_stream", - "type": "stream" - }, "opmon_conf":{ "level": "debug", "interval_s" : 10.0 diff --git a/src/drunc/data/process_manager/ssh-standalone.json b/src/drunc/data/process_manager/ssh-standalone.json index 7aa9b720e..e21a29709 100644 --- a/src/drunc/data/process_manager/ssh-standalone.json +++ b/src/drunc/data/process_manager/ssh-standalone.json @@ -10,16 +10,16 @@ "type": "dummy" }, "environment": { - "GRPC_ENABLE_FORK_SUPPORT": "false" + "GRPC_ENABLE_FORK_SUPPORT": "false", + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)" }, "opmon_uri": { "path": "./info.json", "type": "file" }, - "ers_uri": { - "path": "./info.json", - "type": "file" - }, "opmon_conf": { "level": "info", "interval_s": 10.0 diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index 46217cac1..a4ac057fd 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -36,7 +36,6 @@ def __init__(self): self.settings = {} self.opmon_uri = None self.opmon_publisher = None - self.ers_uri = None class ProcessManagerConfHandler(ConfHandler): @@ -56,7 +55,6 @@ def _parse_dict(self, data): ) new_data.environment = data.get("environment", {}) new_data.settings = data.get("settings", {}) - new_data.ers_uri = data.get("ers_uri") match data["type"].lower(): case "ssh": diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index f99af8a79..7be47b5c6 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -2,11 +2,8 @@ import re import threading import time -import os - -from daqpytools.logging.handlers import LogHandlerConf, HandlerType - +from daqpytools.logging.handlers import LogHandlerConf from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.description_pb2 import CommandDescription, Description @@ -59,14 +56,13 @@ def __init__( **kwargs, ): super().__init__() - ers_uri = configuration.get_data().ers_uri - self.handlerconf = LogHandlerConf(init_ers=False) + self.handlerconf = LogHandlerConf(init_ers=True) self.log = get_logger( - f"process_manager.{configuration.get_data_type_name()}_process_manager", ers_kafka_handler= ers_uri["type"] == "stream" + f"process_manager.{configuration.get_data_type_name()}_process_manager", + ers_kafka_handler=True, ) self.log.debug(pid_info_str()) self.log.debug("Initialized ProcessManager") - self.log.info(f'{ers_uri["type"] == "stream"}') # TODO: Delete before merge self.configuration = configuration self.name = name @@ -143,9 +139,6 @@ def __init__( self.broadcast(message="ready", btype=BroadcastType.SERVER_READY) - #! Envs dont exist! need to have a workaround - self.log.critical(f"{os.getenv('DUNEDAQ_ERS_ERROR')=}") - if self.opmon_publisher is not None: self.stop_event = threading.Event() self.thread = threading.Thread( @@ -184,9 +177,9 @@ def find_by_uuid(pi_list, target_uuid: str): if pi.uuid.uuid == target_uuid: return pi return None - + n_dead_keep = 0 - graveyard = set() + graveyard = set() while not self.stop_event.is_set(): results = self._ps_impl(q) @@ -212,17 +205,15 @@ def find_by_uuid(pi_list, target_uuid: str): n_running=n_running, n_dead=n_dead, n_session=n_session ), ) - if n_dead_keep < n_dead: n_dead_keep = n_dead diff_set = dead_processes - graveyard for diff in diff_set: pi = find_by_uuid(results, diff) self.log.critical( - f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}", extra={"handlers": [HandlerType.Rich, HandlerType.Protobufstream]} + f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}", + extra=self.handlerconf.ERS, ) - #! No environment variables detected here - time.sleep(interval_s) """ From 5173ee81a14639dbfc0fa77ed1f87e92f70841ab Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 11 Feb 2026 12:04:07 +0100 Subject: [PATCH 04/11] Send pm message to rich as well --- src/drunc/process_manager/process_manager.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 7be47b5c6..911c92ef9 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -3,7 +3,7 @@ import threading import time -from daqpytools.logging.handlers import LogHandlerConf +from daqpytools.logging.handlers import HandlerType, LogHandlerConf from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.description_pb2 import CommandDescription, Description @@ -210,10 +210,11 @@ def find_by_uuid(pi_list, target_uuid: str): diff_set = dead_processes - graveyard for diff in diff_set: pi = find_by_uuid(results, diff) - self.log.critical( - f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}", - extra=self.handlerconf.ERS, - ) + err_msg = f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}" + # easiest way to send one to Rich and ERS + self.log.critical(err_msg, extra={"handlers": [HandlerType.Rich]}) + self.log.critical(err_msg, extra=self.handlerconf.ERS) + time.sleep(interval_s) """ From 5038d55257a9fdf286d77efbc33a4e11eafb40fa Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 11 Feb 2026 15:08:37 +0100 Subject: [PATCH 05/11] Inject ers env in pytest --- .../test_process_manager_endpoints.py | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/process_manager/test_process_manager_endpoints.py b/tests/process_manager/test_process_manager_endpoints.py index 6aefb85a1..92d1e4d08 100644 --- a/tests/process_manager/test_process_manager_endpoints.py +++ b/tests/process_manager/test_process_manager_endpoints.py @@ -44,7 +44,7 @@ def mock_logger(): @pytest.fixture(scope="function") -def grpc_servicer(mock_logger): +def grpc_servicer(mock_logger, monkeypatch): """ Create and configure a ConcreteProcessManager instance for testing. @@ -57,6 +57,24 @@ def grpc_servicer(mock_logger): Returns: ConcreteProcessManager: Configured servicer instance ready for testing """ + # Process Manager requires these variables to exist now + # Injecting them in the test + monkeypatch.setenv( + "DUNEDAQ_ERS_ERROR", + "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + ) + monkeypatch.setenv( + "DUNEDAQ_ERS_FATAL", "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)" + ) + monkeypatch.setenv( + "DUNEDAQ_ERS_INFO", + "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + ) + monkeypatch.setenv( + "DUNEDAQ_ERS_WARNING", + "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + ) + servicer = ConcreteProcessManager(session="mock_session") servicer._mock_logger = mock_logger return servicer From bc75d8e4c63830ee9d1056f860f58128d18f7082 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 11 Feb 2026 15:13:53 +0100 Subject: [PATCH 06/11] Add two very quick docs about ERS dependency --- src/drunc/controller/controller.py | 2 ++ src/drunc/process_manager/process_manager.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 5ad3089bf..405e59548 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -221,6 +221,8 @@ class Controller(ControllerServicer): children_nodes: List[ChildNode] = [] def __init__(self, configuration, name: str, session: str, token: Token): + """C'tor. Note that controllers require the ERS variables defined + in OKS to exist as env variables!""" super().__init__() self._previous_error_state = False diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 911c92ef9..13bcafae7 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -55,6 +55,8 @@ def __init__( session: str = None, **kwargs, ): + """C'tor. Note that this takes the ERS env variables from the + json files defined in data/process_manager!""" super().__init__() self.handlerconf = LogHandlerConf(init_ers=True) self.log = get_logger( From f4fc20ad396340f0e5e3918bb731b6c860c3618b Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 11 Feb 2026 15:25:31 +0100 Subject: [PATCH 07/11] Rename variables for clarity --- src/drunc/process_manager/process_manager.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 13bcafae7..0ec1873d8 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -175,13 +175,14 @@ def __del__(self): def publish(self, q: ProcessQuery, interval_s: float = 10.0): def find_by_uuid(pi_list, target_uuid: str): + """Identifies the process from a list by uuid""" for pi in pi_list.values: if pi.uuid.uuid == target_uuid: return pi return None - n_dead_keep = 0 - graveyard = set() + n_dead_prev = 0 + dead_processes_prev = set() while not self.stop_event.is_set(): results = self._ps_impl(q) @@ -207,9 +208,9 @@ def find_by_uuid(pi_list, target_uuid: str): n_running=n_running, n_dead=n_dead, n_session=n_session ), ) - if n_dead_keep < n_dead: - n_dead_keep = n_dead - diff_set = dead_processes - graveyard + if n_dead_prev < n_dead: + n_dead_prev = n_dead + diff_set = dead_processes - dead_processes_prev for diff in diff_set: pi = find_by_uuid(results, diff) err_msg = f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}" From 6ea3c3591a9640c0479c1a97a8e4ed6926506fc6 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 13 Feb 2026 16:05:41 +0100 Subject: [PATCH 08/11] Updating PM env vars --- src/drunc/data/process_manager/k8s-CERN.json | 6 +++++- src/drunc/data/process_manager/k8s.json | 6 +++++- .../data/process_manager/process-manager-k8s-pocket.json | 6 +++++- src/drunc/data/process_manager/ssh-pocket-kafka.json | 6 +++++- .../process_manager/ssh-standalone-paramiko-client.json | 6 +++++- src/drunc/data/process_manager/ssh-standalone.json | 8 ++++---- 6 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/drunc/data/process_manager/k8s-CERN.json b/src/drunc/data/process_manager/k8s-CERN.json index db760e3c4..8e5d5fbbb 100644 --- a/src/drunc/data/process_manager/k8s-CERN.json +++ b/src/drunc/data/process_manager/k8s-CERN.json @@ -14,7 +14,11 @@ "publish_timeout": 2 }, "environment": { - "GRPC_ENABLE_FORK_SUPPORT": "false" + "GRPC_ENABLE_FORK_SUPPORT": "false", + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)" }, "opmon_uri": { "path": "monkafka.cern.ch:30092/opmon_stream", diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index ea78deace..00ddfefef 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -14,7 +14,11 @@ "publish_timeout": 2 }, "environment": { - "GRPC_ENABLE_FORK_SUPPORT": "false" + "GRPC_ENABLE_FORK_SUPPORT": "false", + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout" }, "opmon_uri": { "path": "./info.json", diff --git a/src/drunc/data/process_manager/process-manager-k8s-pocket.json b/src/drunc/data/process_manager/process-manager-k8s-pocket.json index 8139d60e8..54feacce8 100644 --- a/src/drunc/data/process_manager/process-manager-k8s-pocket.json +++ b/src/drunc/data/process_manager/process-manager-k8s-pocket.json @@ -13,6 +13,10 @@ "publish_timeout": 2 }, "environment": { - "GRPC_ENABLE_FORK_SUPPORT": "false" + "GRPC_ENABLE_FORK_SUPPORT": "false", + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)" } } diff --git a/src/drunc/data/process_manager/ssh-pocket-kafka.json b/src/drunc/data/process_manager/ssh-pocket-kafka.json index 5b19f9cf2..e2c0dfac3 100644 --- a/src/drunc/data/process_manager/ssh-pocket-kafka.json +++ b/src/drunc/data/process_manager/ssh-pocket-kafka.json @@ -14,6 +14,10 @@ }, "environment": { - "GRPC_ENABLE_FORK_SUPPORT": "false" + "GRPC_ENABLE_FORK_SUPPORT": "false", + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)" } } diff --git a/src/drunc/data/process_manager/ssh-standalone-paramiko-client.json b/src/drunc/data/process_manager/ssh-standalone-paramiko-client.json index 6e226e9cd..a3c0dc789 100644 --- a/src/drunc/data/process_manager/ssh-standalone-paramiko-client.json +++ b/src/drunc/data/process_manager/ssh-standalone-paramiko-client.json @@ -10,7 +10,11 @@ "type": "dummy" }, "environment": { - "GRPC_ENABLE_FORK_SUPPORT": "false" + "GRPC_ENABLE_FORK_SUPPORT": "false", + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)" }, "opmon_uri": { "path": "./info.json", diff --git a/src/drunc/data/process_manager/ssh-standalone.json b/src/drunc/data/process_manager/ssh-standalone.json index e21a29709..99c5bc353 100644 --- a/src/drunc/data/process_manager/ssh-standalone.json +++ b/src/drunc/data/process_manager/ssh-standalone.json @@ -11,10 +11,10 @@ }, "environment": { "GRPC_ENABLE_FORK_SUPPORT": "false", - "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", - "DUNEDAQ_ERS_FATAL": "erstrace,lstdout,protobufstream(monkafka.cern.ch:30092)", - "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)", - "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout,protobufstream(monkafka.cern.ch:30092)" + "DUNEDAQ_ERS_ERROR": "erstrace,throttle,lstdout", + "DUNEDAQ_ERS_FATAL": "erstrace,lstdout", + "DUNEDAQ_ERS_INFO": "erstrace,throttle,lstdout", + "DUNEDAQ_ERS_WARNING": "erstrace,throttle,lstdout" }, "opmon_uri": { "path": "./info.json", From 102f3049578feeb883ee2cdd9ca83a3a1d06be3f Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 13 Feb 2026 18:04:56 +0100 Subject: [PATCH 09/11] There is now a log of what processes are expected to be dead. 6pm Friday = bad time to test --- src/drunc/process_manager/process_manager.py | 62 ++++++++++++++++++- .../process_manager/ssh_process_manager.py | 15 ++++- 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 0ec1873d8..17a3f4350 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -82,8 +82,12 @@ def __init__( interval_s = getattr(self.configuration.get_data(), "interval_s", 10.0) self.authoriser = DummyAuthoriser(dach, SystemType.PROCESS_MANAGER) - self.process_store = {} # dict[str, sh.RunningCommand] - self.boot_request = {} # dict[str, BootRequest] + self.process_store = {} # dict[str, sh.RunningCommand] # str = uuid + self.boot_request = {} # dict[str, BootRequest] # str = uuid + + # Define a list of applications that we expect to die, and a lock to read the memory + self.dead_process_lock = threading.Lock() + self.expected_dead_applications = {} # dict[str, BootRequest] # str == uuid # TODO, probably need to think of a better way to do this? # Maybe I should "bind" the commands to their methods, and have something looping over this list to generate the gRPC functions @@ -301,6 +305,9 @@ def terminate( try: response = self._terminate_impl() + # Remove the list of dead applications, they are expected to be dead. + with self.dead_process_lock: + self.expected_dead_applications.clear() except NotImplementedError: return ProcessInstanceList( name=self.name, @@ -401,6 +408,7 @@ def flush( ret = [] for uuid in self._get_process_uid(request): + # Some unknown process was found, assume it is dead and move on if uuid not in self.boot_request: pu = ProcessUUID(uuid=uuid) pi = ProcessInstance( @@ -440,6 +448,10 @@ def flush( return_code=return_code, uuid=pu, ) + # If we know that this process has died intentionally, remove it from + # tracking + self.remove_process_from_expected_dead_processes(uuid) + del self.process_store[uuid] ret += [pi] @@ -605,6 +617,52 @@ def _match_processes_against_query( return processes + def add_process_to_expected_dead_processes(self, uuid: str) -> None: + """ + Add the process to the list of processes that are expected to die. Needed as the + OpMon publisher publishes the state when a process dies unexpectedly, and these + processes require tracking. + + Args: + uuid: str - process UUId to add to the dict + + Returns: + None + + Raises: + DruncException - if the process is not known about, this error gets raised + """ + with self.dead_process_lock: + if uuid in self.boot_request: + br = BootRequest() + br.CopyFrom(self.boot_request[uuid]) + self.expected_dead_applications[uuid] = br + else: + err_msg = f"Unexpected process with UUID {uuid} requested to be added to the list of dead applications!" + self.log.error(err_msg) + + def remove_process_from_expected_dead_processes(self, uuid: str) -> None: + """ + Remove the process to the list of processes that are expected to die. Needed as + the OpMon publisher publishes the state when a process dies unexpectedly, and + these processes require tracking. + + Args: + uuid: str - process UUId to add to the dict + + Returns: + None + + Raises: + DruncException - if the process is not known about, this error gets raised + """ + with self.dead_process_lock: + if uuid in self.expected_dead_applications: + self.expected_dead_applications.pop(uuid, None) + else: + err_msg = f"Unexpected process with UUID {uuid} requested to be removed from the list of expected_dead_applications!" + self.log.error(err_msg) + def _get_process_uid( self, query: ProcessQuery, diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 3571f0636..a3db1d81b 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -108,7 +108,11 @@ def kill_processes(self, uuids: list) -> ProcessInstanceList: ret = [] for proc_uuid in uuids: - app_name = self.boot_request[proc_uuid].process_description.metadata.name + process_boot_request = self.boot_request[proc_uuid] + app_name = process_boot_request.process_description.metadata.name + + # Make sure we know that the process is dying + self.add_process_to_expected_dead_processes(proc_uuid) # Terminate process if still alive if self.ssh_lifetime_manager.is_process_alive(proc_uuid): @@ -473,15 +477,22 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: same_uuid_br.CopyFrom(self.boot_request[uuid]) same_uuid = uuid + # Keep track of what applications are expected to be killed so they are not + # reported as unexpectedly dead + self.add_process_to_expected_dead_processes(uuid) + if self.ssh_lifetime_manager.is_process_alive(uuid): self.ssh_lifetime_manager.terminate_process(uuid) self.ssh_lifetime_manager.cleanup_process(uuid) del self.boot_request[uuid] - del uuid ret = [self.__boot(same_uuid_br, same_uuid)] + # Remove the application from the list of dead applications + self.remove_process_from_expected_dead_processes(uuid) + + del uuid del same_uuid_br del same_uuid From fbb75e33a33dce58605e467834c63826c87731c2 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Mon, 16 Feb 2026 13:10:50 +0100 Subject: [PATCH 10/11] Adding a simple check to ensure expected dead processes are not advertised --- src/drunc/process_manager/process_manager.py | 25 ++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 17a3f4350..343a4beaa 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -216,6 +216,11 @@ def find_by_uuid(pi_list, target_uuid: str): n_dead_prev = n_dead diff_set = dead_processes - dead_processes_prev for diff in diff_set: + if diff in self.expected_dead_applications: + self.log.debug( + f"Process {diff} already expected to be dead, continuing" + ) + continue pi = find_by_uuid(results, diff) err_msg = f"Process {pi.process_description.metadata.name} with UUID {pi.uuid.uuid} has died with a return code {pi.return_code}" # easiest way to send one to Rich and ERS @@ -306,8 +311,7 @@ def terminate( try: response = self._terminate_impl() # Remove the list of dead applications, they are expected to be dead. - with self.dead_process_lock: - self.expected_dead_applications.clear() + self.clear_dead_processes() except NotImplementedError: return ProcessInstanceList( name=self.name, @@ -436,6 +440,7 @@ def flush( except Exception: pass + # If a process is already dead, remove it from the process store if not self.process_store[uuid].is_alive(): pi = ProcessInstance( process_description=pd, @@ -663,6 +668,22 @@ def remove_process_from_expected_dead_processes(self, uuid: str) -> None: err_msg = f"Unexpected process with UUID {uuid} requested to be removed from the list of expected_dead_applications!" self.log.error(err_msg) + def clear_dead_processes(self) -> None: + """ + Remove all processes from the tracker of expected dead processes + + Args: + None + + Returns: + None + + Raises: + None + """ + with self.dead_process_lock: + self.expected_dead_applications.clear() + def _get_process_uid( self, query: ProcessQuery, From 59ed05458ca65d515db876f790b58a6ed02e536b Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Mon, 16 Feb 2026 15:35:09 +0100 Subject: [PATCH 11/11] Fixing terminate OpMon --- run_conf1.data.xml | 3132 ++++++++++++++++++ src/drunc/process_manager/process_manager.py | 22 + 2 files changed, 3154 insertions(+) create mode 100644 run_conf1.data.xml diff --git a/run_conf1.data.xml b/run_conf1.data.xml new file mode 100644 index 000000000..109092f40 --- /dev/null +++ b/run_conf1.data.xml @@ -0,0 +1,3132 @@ + + + + + + + + + + + + + + + + + + + + + + + + +]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 18a70da6a..582a07e2b 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -309,6 +309,7 @@ def terminate( self.log.debug(f"{self.name} running terminate") try: + self.mark_all_processes_as_expected_dead() response = self._terminate_impl() # Remove the list of dead applications, they are expected to be dead. self.clear_dead_processes() @@ -676,6 +677,27 @@ def remove_process_from_expected_dead_processes(self, uuid: str) -> None: err_msg = f"Unexpected process with UUID {uuid} requested to be removed from the list of expected_dead_applications!" self.log.error(err_msg) + def mark_all_processes_as_expected_dead(self) -> None: + """ + Remove all processes from the tracker of expected dead processes + + Args: + None + + Returns: + None + + Raises: + None + """ + with self.dead_process_lock: + for proc_uuid in self.boot_request: + if proc_uuid in self.expected_dead_applications: + continue + self.expected_dead_applications[proc_uuid] = self.boot_request[ + proc_uuid + ] + def clear_dead_processes(self) -> None: """ Remove all processes from the tracker of expected dead processes