From b20656abd07c8f0711ff10c1e4a5621dc87c7986 Mon Sep 17 00:00:00 2001 From: huniu20 Date: Thu, 4 Jun 2026 17:09:02 +0800 Subject: [PATCH] [feat] Support cross-job actor discovery via explicit namespace When multiple Ray Jobs share the same Ray cluster, Named Actors are isolated by namespace. Without an explicit namespace, a TQ Controller created by one job is invisible to workers in another job. This commit adds namespace="transfer_queue" to both: - ray.get_actor() in _init_from_existing() - TransferQueueController.options() in init() This ensures that the TQ Controller is always registered and discovered in the fixed "transfer_queue" namespace, enabling cross-job TQ sharing (e.g., a teacher server job creates TQ, and a trainer job connects to it). This change is backward-compatible: single-job usage is unaffected since the namespace is consistent between creation and discovery. Signed-off-by: huniu20 --- recipe/simple_use_case/relax_demo.py | 2 +- tests/e2e/test_kv_interface_e2e.py | 2 +- transfer_queue/interface.py | 8 ++++---- transfer_queue/storage/clients/ray_storage_client.py | 6 ++++-- tutorial/06_streaming_dataloader.py | 2 +- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/recipe/simple_use_case/relax_demo.py b/recipe/simple_use_case/relax_demo.py index 1bdfd1e3..7e3efa83 100644 --- a/recipe/simple_use_case/relax_demo.py +++ b/recipe/simple_use_case/relax_demo.py @@ -131,7 +131,7 @@ class BaseStageWorker: def __init__(self, tq_config, tracker, worker_id: int, config: "DemoConfig"): tq.init(tq_config) self.tq_client = tq.get_client() - controller = ray.get_actor("TransferQueueController") + controller = ray.get_actor("TransferQueueController", namespace="transfer_queue") self.cfg = ray.get(controller.get_config.remote()) self.tracker = tracker self.worker_id = worker_id diff --git a/tests/e2e/test_kv_interface_e2e.py b/tests/e2e/test_kv_interface_e2e.py index a9682e3b..ae9dddc7 100644 --- a/tests/e2e/test_kv_interface_e2e.py +++ b/tests/e2e/test_kv_interface_e2e.py @@ -154,7 +154,7 @@ def tq_system(ray_init, backend_name): @pytest.fixture def controller(tq_system): """Get the TransferQueueController actor for direct verification.""" - controller = ray.get_actor("TransferQueueController") + controller = ray.get_actor("TransferQueueController", namespace="transfer_queue") yield controller diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 933b42c3..82ceacaf 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -93,7 +93,7 @@ def _init_from_existing() -> bool: global _TQ_CONTROLLER try: if _TQ_CONTROLLER is None: - _TQ_CONTROLLER = ray.get_actor("TransferQueueController") + _TQ_CONTROLLER = ray.get_actor("TransferQueueController", namespace="transfer_queue") except ValueError: logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.") @@ -174,9 +174,9 @@ def init(conf: DictConfig | None = None) -> DictConfig | None: try: global _TQ_CONTROLLER - _TQ_CONTROLLER = TransferQueueController.options(name="TransferQueueController").remote( # type: ignore[attr-defined] - sampler=sampler, polling_mode=final_conf.controller.polling_mode - ) + _TQ_CONTROLLER = TransferQueueController.options( # type: ignore[attr-defined] + name="TransferQueueController", namespace="transfer_queue" + ).remote(sampler=sampler, polling_mode=final_conf.controller.polling_mode) logger.info("TransferQueueController has been created.") except ValueError: logger.info("Some other rank has initialized TransferQueueController. Try to connect to existing controller.") diff --git a/transfer_queue/storage/clients/ray_storage_client.py b/transfer_queue/storage/clients/ray_storage_client.py index c85fa438..add04974 100644 --- a/transfer_queue/storage/clients/ray_storage_client.py +++ b/transfer_queue/storage/clients/ray_storage_client.py @@ -57,9 +57,11 @@ def __init__(self, config=None): # initialize actor try: - self.storage_actor = ray.get_actor("RayObjectRefStorage") + self.storage_actor = ray.get_actor("RayObjectRefStorage", namespace="transfer_queue") except ValueError: - self.storage_actor = RayObjectRefStorage.options(name="RayObjectRefStorage", get_if_exists=False).remote() + self.storage_actor = RayObjectRefStorage.options( + name="RayObjectRefStorage", namespace="transfer_queue", get_if_exists=False + ).remote() def put(self, keys: list[str], values: list[Any]) -> list[Any] | None: """ diff --git a/tutorial/06_streaming_dataloader.py b/tutorial/06_streaming_dataloader.py index 83591683..0571e108 100644 --- a/tutorial/06_streaming_dataloader.py +++ b/tutorial/06_streaming_dataloader.py @@ -184,7 +184,7 @@ def update_worker( # Step 1: Create StreamingDataset # This dataset integrates with TransferQueue and handles batch retrieval - controller = ray.get_actor("TransferQueueController") + controller = ray.get_actor("TransferQueueController", namespace="transfer_queue") config = ray.get(controller.get_config.remote()) dataset = StreamingDataset(