Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion recipe/simple_use_case/relax_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class BaseStageWorker:
def __init__(self, tq_config, tracker, worker_id: int, config: "DemoConfig"):
tq.init(tq_config)
self.tq_client = tq.get_client()
controller = ray.get_actor("TransferQueueController")
controller = ray.get_actor("TransferQueueController", namespace="transfer_queue")
self.cfg = ray.get(controller.get_config.remote())
self.tracker = tracker
self.worker_id = worker_id
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/test_kv_interface_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def tq_system(ray_init, backend_name):
@pytest.fixture
def controller(tq_system):
"""Get the TransferQueueController actor for direct verification."""
controller = ray.get_actor("TransferQueueController")
controller = ray.get_actor("TransferQueueController", namespace="transfer_queue")
yield controller


Expand Down
8 changes: 4 additions & 4 deletions transfer_queue/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _init_from_existing() -> bool:
global _TQ_CONTROLLER
try:
if _TQ_CONTROLLER is None:
_TQ_CONTROLLER = ray.get_actor("TransferQueueController")
_TQ_CONTROLLER = ray.get_actor("TransferQueueController", namespace="transfer_queue")

except ValueError:
logger.info("Called _init_from_existing() but TransferQueueController has not been initialized yet.")
Expand Down Expand Up @@ -174,9 +174,9 @@ def init(conf: DictConfig | None = None) -> DictConfig | None:

try:
global _TQ_CONTROLLER
_TQ_CONTROLLER = TransferQueueController.options(name="TransferQueueController").remote( # type: ignore[attr-defined]
sampler=sampler, polling_mode=final_conf.controller.polling_mode
)
_TQ_CONTROLLER = TransferQueueController.options( # type: ignore[attr-defined]
name="TransferQueueController", namespace="transfer_queue"
).remote(sampler=sampler, polling_mode=final_conf.controller.polling_mode)
logger.info("TransferQueueController has been created.")
except ValueError:
logger.info("Some other rank has initialized TransferQueueController. Try to connect to existing controller.")
Expand Down
6 changes: 4 additions & 2 deletions transfer_queue/storage/clients/ray_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ def __init__(self, config=None):

# initialize actor
try:
self.storage_actor = ray.get_actor("RayObjectRefStorage")
self.storage_actor = ray.get_actor("RayObjectRefStorage", namespace="transfer_queue")
except ValueError:
self.storage_actor = RayObjectRefStorage.options(name="RayObjectRefStorage", get_if_exists=False).remote()
self.storage_actor = RayObjectRefStorage.options(
name="RayObjectRefStorage", namespace="transfer_queue", get_if_exists=False
).remote()

def put(self, keys: list[str], values: list[Any]) -> list[Any] | None:
"""
Expand Down
2 changes: 1 addition & 1 deletion tutorial/06_streaming_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def update_worker(
# Step 1: Create StreamingDataset
# This dataset integrates with TransferQueue and handles batch retrieval

controller = ray.get_actor("TransferQueueController")
controller = ray.get_actor("TransferQueueController", namespace="transfer_queue")
config = ray.get(controller.get_config.remote())

dataset = StreamingDataset(
Expand Down
Loading