Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 79 additions & 19 deletions src/drunc/process_manager/k8s_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,17 +1010,29 @@ def _get_pod_volumes_and_mounts(

return pod_volumes, container_volume_mounts

def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]:
def _get_tree_labels(
self, tree_id: str, podname: str, boot_request: BootRequest
) -> dict[str, str]:
"""
Determines the role of a pod based on its tree_id,
Determines the role of a pod based on its tree_id and executable type,
and returns a dictionary of labels to be applied.

Role mapping: tree_id '0' -> root-controller, depth 0 -> infrastructure-applications,
depth 1 -> segment-controller, depth 2 -> application, otherwise 'unknown'.
Role mapping:
- tree_id '0' -> root-controller
- tree_id root != '0' (e.g. '1', '2') -> infrastructure-applications
- tree_id starts with '0' + controller exe -> segment-controller
- tree_id starts with '0' + non-controller -> application
- otherwise -> unknown

The executable name is used to distinguish segment-controllers from
applications, since both can appear at any depth under the root segment
and share the same tree_id format.

Args:
tree_id: The dot-separated tree identifier string (e.g. '0', '1', '0.1', '0.1.2').
podname: The name of the pod (used for logging).
boot_request: The BootRequest for this process, used to inspect the
executable and determine if it is a controller.

Returns:
A dictionary of labels containing 'tree-id.{drunc_label}' and
Expand All @@ -1032,24 +1044,44 @@ def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]:

if not tree_id:
role = "unknown"
elif tree_id == "0":
role = "root-controller"
else:
# Count the depth
depth = tree_id.count(".")
if depth == 0:
role = "infrastructure-applications"
elif depth == 1:
elif self._is_controller_executable(boot_request):
if tree_id == "0":
role = "root-controller"
elif tree_id.startswith("0."):
role = "segment-controller"
elif depth == 2:
else:
role = "infrastructure-applications" # controller outside segment tree

else:
if tree_id.startswith("0."):
role = "application"
else:
role = "infrastructure-applications"

labels[f"role.{self.drunc_label}"] = role
self.log.info(
f"Assigning labels for '{podname}': role={role}, tree-id={tree_id}"
)
return labels

def _is_controller_executable(self, boot_request: BootRequest) -> bool:
"""
Check whether the boot request's main executable is a drunc-controller.

Inspects all executable-and-arguments entries in the process description
for the 'drunc-controller' executable name.

Args:
boot_request: The BootRequest to inspect.

Returns:
True if any executable entry is 'drunc-controller', False otherwise.
"""
for e_and_a in boot_request.process_description.executable_and_arguments:
if e_and_a.exec == "drunc-controller":
return True
return False

def _build_container_env(
self, boot_request: BootRequest, tree_labels: dict[str, str]
) -> list[client.V1EnvVar]:
Expand Down Expand Up @@ -2158,7 +2190,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
session = boot_request.process_description.metadata.session
podname = boot_request.process_description.metadata.name
tree_labels = self._get_tree_labels(
boot_request.process_description.metadata.tree_id, podname
boot_request.process_description.metadata.tree_id, podname, boot_request
)
# Pre-checks (Session validation, NodePort collision)
self._run_pre_boot_checks(session, podname, boot_request)
Expand Down Expand Up @@ -2393,10 +2425,12 @@ def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList:

Performs an ordered shutdown of matched processes by their role labels:
unknown → application → segment-controller → root-controller →
infrastructure-applications. Each stage issues delete requests and
blocks until the watcher thread confirms all pods in that stage have
terminated (or a timeout is reached). After all pods are killed,
cleans up managed namespaces if no tracked processes remain.
infrastructure-applications. Segment-controllers are further split
by tree-id depth, shutting down the deepest first (leaves before
parents). Each stage issues delete requests and blocks until the
watcher thread confirms all pods in that stage have terminated
(or a timeout is reached). After all pods are killed, cleans up
managed namespaces if no tracked processes remain.

Args:
query - a ProcessQuery specifying which processes to kill
Expand Down Expand Up @@ -2468,20 +2502,46 @@ def kill_and_wait(uuids, grace_period=None) -> None:
"root-controller": [],
"infrastructure-applications": [],
}
# For segment-controllers, also track tree-id depth per uuid
segment_controller_depths: dict[str, int] = {}

uuid_label_key = f"uuid.{self.drunc_label}"
role_label_key = f"role.{self.drunc_label}"
tree_id_label_key = f"tree-id.{self.drunc_label}"

for pod in all_pods:
uuid = pod.metadata.labels.get(uuid_label_key)
if uuid and uuid in targeted_uuids:
role = pod.metadata.labels.get(role_label_key, "unknown")
pods_by_role[role].append(uuid)
if role == "segment-controller":
tree_id = pod.metadata.labels.get(tree_id_label_key, "")
segment_controller_depths[uuid] = tree_id.count(".") if tree_id else 0

# Kill in stages using our sorted lists
for role in PROCESS_SHUTDOWN_ORDERING:
uuids_in_step = pods_by_role[role]
if uuids_in_step:
if not uuids_in_step:
continue

if role == "segment-controller":
# Group segment-controllers by depth and kill deepest first
by_depth: dict[int, list[str]] = {}
for uuid in uuids_in_step:
depth = segment_controller_depths.get(uuid, 0)
by_depth.setdefault(depth, []).append(uuid)

for depth in sorted(by_depth.keys(), reverse=True):
depth_uuids = by_depth[depth]
self.log.info(
f"--- Termination Step: Shutting down role '{role}' at depth {depth} "
f"({len(depth_uuids)} pod(s)) ---"
)
kill_and_wait(depth_uuids) # This call is blocking
self.log.info(
f"--- Termination Step: Role '{role}' at depth {depth} complete ---"
)
else:
self.log.info(
f"--- Termination Step: Shutting down role '{role}' ({len(uuids_in_step)} pod(s)) ---"
)
Expand Down
3 changes: 2 additions & 1 deletion src/drunc/process_manager/oks_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ def collect_apps(
apps.append(app)

# Get all the enabled applications of this segment
app_index = 0
# Start app_index after sub-segment indices to avoid tree_id collisions
app_index = len(segment_obj.segments)
for app in segment_obj.applications:
log.debug(f"Considering app {app.id}")
if "Resource" in app.oksTypes():
Expand Down
2 changes: 1 addition & 1 deletion src/drunc/process_manager/process_manager_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ def _build_boot_request_dummy_boot(
# ----- RPC methods -----
def terminate(
self,
timeout: int | float = 60,
timeout: int | float = 130,
) -> ProcessInstanceList:
request = Request(token=copy_token(self.token))

Expand Down
Loading