diff --git a/recipe/simple_use_case/relax_demo.py b/recipe/simple_use_case/relax_demo.py index 1bdfd1e3..7e3efa83 100644 --- a/recipe/simple_use_case/relax_demo.py +++ b/recipe/simple_use_case/relax_demo.py @@ -131,7 +131,7 @@ class BaseStageWorker: def __init__(self, tq_config, tracker, worker_id: int, config: "DemoConfig"): tq.init(tq_config) self.tq_client = tq.get_client() - controller = ray.get_actor("TransferQueueController") + controller = ray.get_actor("TransferQueueController", namespace="transfer_queue") self.cfg = ray.get(controller.get_config.remote()) self.tracker = tracker self.worker_id = worker_id diff --git a/tests/e2e/test_kv_interface_e2e.py b/tests/e2e/test_kv_interface_e2e.py index a9682e3b..ae9dddc7 100644 --- a/tests/e2e/test_kv_interface_e2e.py +++ b/tests/e2e/test_kv_interface_e2e.py @@ -154,7 +154,7 @@ def tq_system(ray_init, backend_name): @pytest.fixture def controller(tq_system): """Get the TransferQueueController actor for direct verification.""" - controller = ray.get_actor("TransferQueueController") + controller = ray.get_actor("TransferQueueController", namespace="transfer_queue") yield controller diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 933b42c3..82ceacaf 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -93,7 +93,7 @@ def _init_from_existing() -> bool: global _TQ_CONTROLLER try: if _TQ_CONTROLLER is None: - _TQ_CONTROLLER = ray.get_actor("TransferQueueController") + _TQ_CONTROLLER = ray.get_actor("TransferQueueController", namespace="transfer_queue") except ValueError: logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.") @@ -174,9 +174,9 @@ def init(conf: DictConfig | None = None) -> DictConfig | None: try: global _TQ_CONTROLLER - _TQ_CONTROLLER = TransferQueueController.options(name="TransferQueueController").remote( # type: ignore[attr-defined] - sampler=sampler, polling_mode=final_conf.controller.polling_mode - ) + _TQ_CONTROLLER = TransferQueueController.options( # type: ignore[attr-defined] + name="TransferQueueController", namespace="transfer_queue" + ).remote(sampler=sampler, polling_mode=final_conf.controller.polling_mode) logger.info("TransferQueueController has been created.") except ValueError: logger.info("Some other rank has initialized TransferQueueController. Try to connect to existing controller.") diff --git a/transfer_queue/storage/clients/ray_storage_client.py b/transfer_queue/storage/clients/ray_storage_client.py index c85fa438..add04974 100644 --- a/transfer_queue/storage/clients/ray_storage_client.py +++ b/transfer_queue/storage/clients/ray_storage_client.py @@ -57,9 +57,11 @@ def __init__(self, config=None): # initialize actor try: - self.storage_actor = ray.get_actor("RayObjectRefStorage") + self.storage_actor = ray.get_actor("RayObjectRefStorage", namespace="transfer_queue") except ValueError: - self.storage_actor = RayObjectRefStorage.options(name="RayObjectRefStorage", get_if_exists=False).remote() + self.storage_actor = RayObjectRefStorage.options( + name="RayObjectRefStorage", namespace="transfer_queue", get_if_exists=False + ).remote() def put(self, keys: list[str], values: list[Any]) -> list[Any] | None: """ diff --git a/tutorial/06_streaming_dataloader.py b/tutorial/06_streaming_dataloader.py index 83591683..0571e108 100644 --- a/tutorial/06_streaming_dataloader.py +++ b/tutorial/06_streaming_dataloader.py @@ -184,7 +184,7 @@ def update_worker( # Step 1: Create StreamingDataset # This dataset integrates with TransferQueue and handles batch retrieval - controller = ray.get_actor("TransferQueueController") + controller = ray.get_actor("TransferQueueController", namespace="transfer_queue") config = ray.get(controller.get_config.remote()) dataset = StreamingDataset(