[fix] Honor dedicated update_data_status thread to prevent race condition#114
Closed
0oshowero0 wants to merge 2 commits into
Closed
[fix] Honor dedicated update_data_status thread to prevent race condition#1140oshowero0 wants to merge 2 commits into
update_data_status thread to prevent race condition#1140oshowero0 wants to merge 2 commits into
Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
data_status_update thread to prevent race conditionupdate_data_status thread to prevent race condition
Contributor
There was a problem hiding this comment.
Pull request overview
This PR reroutes storage-manager “data ready” notifications (NOTIFY_DATA_UPDATE) to be handled on the controller’s main request socket (request_handle_socket), removing the dedicated data-status-update socket/thread and updating tests accordingly.
Changes:
- Storage managers now connect to
request_handle_socketto sendNOTIFY_DATA_UPDATE. - Controller removes
data_status_update_socket/ background update thread and handlesNOTIFY_DATA_UPDATEin_process_request. - Tests are updated to stop advertising/expecting
data_status_update_socketports.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
transfer_queue/storage/managers/base.py |
Routes notify_data_update() to request_handle_socket instead of the removed data-status-update socket. |
transfer_queue/controller.py |
Removes the data-status-update socket/thread and processes NOTIFY_DATA_UPDATE on the request-handling thread; also removes partition locking. |
tests/test_ray_p2p.py |
Updates mock controller port map to remove data_status_update_socket. |
tests/test_async_simple_storage_manager.py |
Updates mock controller port maps / setup to remove data_status_update_socket. |
Comments suppressed due to low confidence (1)
transfer_queue/controller.py:882
to_snapshot()now copies partition state without synchronization. If another thread expands/writes tensors concurrently, the snapshot can become internally inconsistent. Ifdata_status_lockis reintroduced, it also must be skipped during deepcopy and used to guard the snapshot copy.
for name, value in self.__dict__.items():
if isinstance(value, Tensor):
new_val = value.clone().detach()
else:
new_val = copy.deepcopy(value)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from itertools import groupby | ||
| from operator import itemgetter | ||
| from threading import Lock, Thread | ||
| from threading import Thread |
Comment on lines
360
to
363
| # User-defined Keys | ||
| keys_mapping: dict[str, int] = field(default_factory=dict) # key -> global_idx | ||
| revert_keys_mapping: dict[int, str] = field(default_factory=dict) # global_idx -> key | ||
|
|
Comment on lines
404
to
+408
| # Expand the state matrices | ||
| max_sample_idx = max(allocated_indexes) | ||
| required_samples = max_sample_idx + 1 | ||
|
|
||
| with self.data_status_lock: | ||
| self.ensure_samples_capacity(required_samples) | ||
| self.ensure_samples_capacity(required_samples) |
Comment on lines
520
to
+524
| # Determine required capacity | ||
| max_sample_idx = max(global_indices) if global_indices else -1 | ||
| required_samples = max_sample_idx + 1 | ||
|
|
||
| with self.data_status_lock: | ||
| # Ensure we have enough rows | ||
| self.ensure_samples_capacity(required_samples) | ||
| # Ensure we have enough rows |
Comment on lines
632
to
+636
| if mask: | ||
| if partition_global_index.numel() == 0: | ||
| empty_status = self.consumption_status[task_name].new_zeros(0) | ||
| return partition_global_index, empty_status | ||
| with self.data_status_lock: | ||
| self.ensure_samples_capacity(max(partition_global_index) + 1) | ||
| self.ensure_samples_capacity(max(partition_global_index) + 1) |
Comment on lines
+724
to
+728
| row_mask = torch.ones(self.allocated_samples_num, dtype=torch.bool) | ||
|
|
||
| # Apply consumption filter (exclude already consumed samples) | ||
| _, consumption_status = self.get_consumption_status(task_name, mask=False) | ||
| if consumption_status is not None: | ||
| unconsumed_mask = consumption_status == 0 | ||
| row_mask &= unconsumed_mask | ||
| # Apply consumption filter (exclude already consumed samples) | ||
| _, consumption_status = self.get_consumption_status(task_name, mask=False) | ||
| if consumption_status is not None: |
Comment on lines
1003
to
1005
| # Start background processing threads | ||
| self._start_process_handshake() | ||
| self._start_process_update_data_status() | ||
| self._start_process_request() |
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.