Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest build pytest_asyncio
python -m pip install flake8 pytest build pytest_asyncio pytest-mock openyuanrong-datasystem

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may place yuanrong and test dependencies in https://github.com/Ascend/TransferQueue/blob/main/pyproject.toml

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do it in next PR.

python -m build --wheel
pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu
pip install dist/*.whl
Expand Down
8 changes: 4 additions & 4 deletions tests/test_yuanrong_storage_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MockBuffer:
def __init__(self, size):
self.data = bytearray(size)

def mutable_data(self):
def MutableData(self):
return self.data


Expand Down Expand Up @@ -69,15 +69,15 @@ def mock_deserialization(items):
except UnicodeDecodeError:
return data

mocker.patch("transfer_queue.storage.clients.yuanrong_client.serialization", side_effect=mock_serialization)
mocker.patch("transfer_queue.storage.clients.yuanrong_client.deserialization", side_effect=mock_deserialization)
mocker.patch("transfer_queue.storage.clients.yuanrong_client._encoder.encode", side_effect=mock_serialization)
mocker.patch("transfer_queue.storage.clients.yuanrong_client._decoder.decode", side_effect=mock_deserialization)

stored_raw_buffers = []

def side_effect_mcreate(keys, sizes):
buffers = [MockBuffer(size) for size in sizes]
for b in buffers:
stored_raw_buffers.append(b.mutable_data())
stored_raw_buffers.append(b.MutableData())
return 0, buffers

storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate
Expand Down
14 changes: 14 additions & 0 deletions transfer_queue/storage/clients/yuanrong_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes):
return tensors

def mset_zcopy(self, keys: list[str], objs: list[Any]):
"""Store multiple objects in zero-copy mode using parallel serialization and buffer packing.

Args:
keys (list[str]): List of string keys under which the objects will be stored.
objs (list[Any]): List of Python objects to store (e.g., tensors, strings).
"""
items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs]
packed_sizes = [calc_packed_size(items) for items in items_list]
status, buffers = self._cpu_ds_client.mcreate(keys, packed_sizes)
Expand All @@ -194,6 +200,14 @@ def mset_zcopy(self, keys: list[str], objs: list[Any]):
self._cpu_ds_client.mset_buffer(buffers)

def mget_zcopy(self, keys: list[str]) -> list[Any]:
"""Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers.

Args:
keys (list[str]): List of string keys to retrieve from storage.

Returns:
list[Any]: List of deserialized objects corresponding to the input keys.
"""
status, buffers = self._cpu_ds_client.get_buffers(keys, timeout_ms=500)
return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers]

Expand Down