diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index f7d80d5e0..165f760b6 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -1010,17 +1010,29 @@ def _get_pod_volumes_and_mounts( return pod_volumes, container_volume_mounts - def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: + def _get_tree_labels( + self, tree_id: str, podname: str, boot_request: BootRequest + ) -> dict[str, str]: """ - Determines the role of a pod based on its tree_id, + Determines the role of a pod based on its tree_id and executable type, and returns a dictionary of labels to be applied. - Role mapping: tree_id '0' -> root-controller, depth 0 -> infrastructure-applications, - depth 1 -> segment-controller, depth 2 -> application, otherwise 'unknown'. + Role mapping: + - tree_id '0' -> root-controller + - tree_id root != '0' (e.g. '1', '2') -> infrastructure-applications + - tree_id starts with '0' + controller exe -> segment-controller + - tree_id starts with '0' + non-controller -> application + - otherwise -> unknown + + The executable name is used to distinguish segment-controllers from + applications, since both can appear at any depth under the root segment + and share the same tree_id format. Args: tree_id: The dot-separated tree identifier string (e.g. '0', '1', '0.1', '0.1.2'). podname: The name of the pod (used for logging). + boot_request: The BootRequest for this process, used to inspect the + executable and determine if it is a controller. Returns: A dictionary of labels containing 'tree-id.{drunc_label}' and @@ -1032,17 +1044,19 @@ def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: if not tree_id: role = "unknown" - elif tree_id == "0": - role = "root-controller" - else: - # Count the depth - depth = tree_id.count(".") - if depth == 0: - role = "infrastructure-applications" - elif depth == 1: + elif self._is_controller_executable(boot_request): + if tree_id == "0": + role = "root-controller" + elif tree_id.startswith("0."): role = "segment-controller" - elif depth == 2: + else: + role = "infrastructure-applications" # controller outside segment tree + + else: + if tree_id.startswith("0."): role = "application" + else: + role = "infrastructure-applications" labels[f"role.{self.drunc_label}"] = role self.log.info( @@ -1050,6 +1064,24 @@ def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: ) return labels + def _is_controller_executable(self, boot_request: BootRequest) -> bool: + """ + Check whether the boot request's main executable is a drunc-controller. + + Inspects all executable-and-arguments entries in the process description + for the 'drunc-controller' executable name. + + Args: + boot_request: The BootRequest to inspect. + + Returns: + True if any executable entry is 'drunc-controller', False otherwise. + """ + for e_and_a in boot_request.process_description.executable_and_arguments: + if e_and_a.exec == "drunc-controller": + return True + return False + def _build_container_env( self, boot_request: BootRequest, tree_labels: dict[str, str] ) -> list[client.V1EnvVar]: @@ -2158,7 +2190,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: session = boot_request.process_description.metadata.session podname = boot_request.process_description.metadata.name tree_labels = self._get_tree_labels( - boot_request.process_description.metadata.tree_id, podname + boot_request.process_description.metadata.tree_id, podname, boot_request ) # Pre-checks (Session validation, NodePort collision) self._run_pre_boot_checks(session, podname, boot_request) @@ -2393,10 +2425,12 @@ def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: Performs an ordered shutdown of matched processes by their role labels: unknown → application → segment-controller → root-controller → - infrastructure-applications. Each stage issues delete requests and - blocks until the watcher thread confirms all pods in that stage have - terminated (or a timeout is reached). After all pods are killed, - cleans up managed namespaces if no tracked processes remain. + infrastructure-applications. Segment-controllers are further split + by tree-id depth, shutting down the deepest first (leaves before + parents). Each stage issues delete requests and blocks until the + watcher thread confirms all pods in that stage have terminated + (or a timeout is reached). After all pods are killed, cleans up + managed namespaces if no tracked processes remain. Args: query - a ProcessQuery specifying which processes to kill @@ -2468,20 +2502,46 @@ def kill_and_wait(uuids, grace_period=None) -> None: "root-controller": [], "infrastructure-applications": [], } + # For segment-controllers, also track tree-id depth per uuid + segment_controller_depths: dict[str, int] = {} uuid_label_key = f"uuid.{self.drunc_label}" role_label_key = f"role.{self.drunc_label}" + tree_id_label_key = f"tree-id.{self.drunc_label}" for pod in all_pods: uuid = pod.metadata.labels.get(uuid_label_key) if uuid and uuid in targeted_uuids: role = pod.metadata.labels.get(role_label_key, "unknown") pods_by_role[role].append(uuid) + if role == "segment-controller": + tree_id = pod.metadata.labels.get(tree_id_label_key, "") + segment_controller_depths[uuid] = tree_id.count(".") if tree_id else 0 # Kill in stages using our sorted lists for role in PROCESS_SHUTDOWN_ORDERING: uuids_in_step = pods_by_role[role] - if uuids_in_step: + if not uuids_in_step: + continue + + if role == "segment-controller": + # Group segment-controllers by depth and kill deepest first + by_depth: dict[int, list[str]] = {} + for uuid in uuids_in_step: + depth = segment_controller_depths.get(uuid, 0) + by_depth.setdefault(depth, []).append(uuid) + + for depth in sorted(by_depth.keys(), reverse=True): + depth_uuids = by_depth[depth] + self.log.info( + f"--- Termination Step: Shutting down role '{role}' at depth {depth} " + f"({len(depth_uuids)} pod(s)) ---" + ) + kill_and_wait(depth_uuids) # This call is blocking + self.log.info( + f"--- Termination Step: Role '{role}' at depth {depth} complete ---" + ) + else: self.log.info( f"--- Termination Step: Shutting down role '{role}' ({len(uuids_in_step)} pod(s)) ---" ) diff --git a/src/drunc/process_manager/oks_parser.py b/src/drunc/process_manager/oks_parser.py index 8f2d9573e..f679e7f1a 100644 --- a/src/drunc/process_manager/oks_parser.py +++ b/src/drunc/process_manager/oks_parser.py @@ -223,7 +223,8 @@ def collect_apps( apps.append(app) # Get all the enabled applications of this segment - app_index = 0 + # Start app_index after sub-segment indices to avoid tree_id collisions + app_index = len(segment_obj.segments) for app in segment_obj.applications: log.debug(f"Considering app {app.id}") if "Resource" in app.oksTypes(): diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 6411c3eab..cd7169f99 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -819,7 +819,7 @@ def _build_boot_request_dummy_boot( # ----- RPC methods ----- def terminate( self, - timeout: int | float = 60, + timeout: int | float = 130, ) -> ProcessInstanceList: request = Request(token=copy_token(self.token))