From 01606ff36cb8cac6130717a84cf7a2c23811b5e9 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Mon, 26 Jan 2026 15:27:57 +0800 Subject: [PATCH 1/3] add ds zero copy in CPU Tensor Signed-off-by: Evelynn-V --- .github/workflows/python-package.yml | 2 +- tests/test_yuanrong_storage_manager.py | 8 ++++---- transfer_queue/storage/clients/yuanrong_client.py | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2917c789..cadce432 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -31,7 +31,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest build pytest_asyncio + python -m pip install flake8 pytest build pytest_asyncio pytest-mock python -m build --wheel pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu pip install dist/*.whl diff --git a/tests/test_yuanrong_storage_manager.py b/tests/test_yuanrong_storage_manager.py index 141d9dc9..4c38b7cc 100644 --- a/tests/test_yuanrong_storage_manager.py +++ b/tests/test_yuanrong_storage_manager.py @@ -33,7 +33,7 @@ class MockBuffer: def __init__(self, size): self.data = bytearray(size) - def mutable_data(self): + def MutableData(self): return self.data @@ -69,15 +69,15 @@ def mock_deserialization(items): except UnicodeDecodeError: return data - mocker.patch("transfer_queue.storage.clients.yuanrong_client.serialization", side_effect=mock_serialization) - mocker.patch("transfer_queue.storage.clients.yuanrong_client.deserialization", side_effect=mock_deserialization) + mocker.patch("transfer_queue.storage.clients.yuanrong_client._encoder.encode", side_effect=mock_serialization) + mocker.patch("transfer_queue.storage.clients.yuanrong_client._decoder.decode", side_effect=mock_deserialization) stored_raw_buffers = [] def side_effect_mcreate(keys, sizes): buffers = [MockBuffer(size) for size in sizes] for b in buffers: - stored_raw_buffers.append(b.mutable_data()) + stored_raw_buffers.append(b.MutableData()) return 0, buffers storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 5fa52849..c2334729 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -185,6 +185,12 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes): return tensors def mset_zcopy(self, keys: list[str], objs: list[Any]): + """Store multiple objects in zero-copy mode using parallel serialization and buffer packing. + + Args: + keys (list[str]): List of string keys under which the objects will be stored. + objs (list[Any]): List of Python objects to store (e.g., tensors, strings). + """ items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs] packed_sizes = [calc_packed_size(items) for items in items_list] status, buffers = self._cpu_ds_client.mcreate(keys, packed_sizes) @@ -194,6 +200,14 @@ def mset_zcopy(self, keys: list[str], objs: list[Any]): self._cpu_ds_client.mset_buffer(buffers) def mget_zcopy(self, keys: list[str]) -> list[Any]: + """Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers. + + Args: + keys (list[str]): List of string keys to retrieve from storage. + + Returns: + list[Any]: List of deserialized objects corresponding to the input keys. + """ status, buffers = self._cpu_ds_client.get_buffers(keys, timeout_ms=500) return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers] From 2fad85083fa38b63952eabcabba153ee06b7c575 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Mon, 26 Jan 2026 16:19:43 +0800 Subject: [PATCH 2/3] UT avoids relying on yr Signed-off-by: Evelynn-V --- tests/test_yuanrong_storage_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_yuanrong_storage_manager.py b/tests/test_yuanrong_storage_manager.py index 4c38b7cc..934dcfca 100644 --- a/tests/test_yuanrong_storage_manager.py +++ b/tests/test_yuanrong_storage_manager.py @@ -43,8 +43,8 @@ def mock_kv_client(self, mocker): mock_client = MagicMock() mock_client.init.return_value = None - mocker.patch("yr.datasystem.KVClient", return_value=mock_client) - mocker.patch("yr.datasystem.DsTensorClient") + mocker.patch("transfer_queue.storage.clients.yuanrong_client.datasystem.KVClient", return_value=mock_client) + mocker.patch("transfer_queue.storage.clients.yuanrong_client.datasystem.DsTensorClient") mocker.patch("transfer_queue.storage.clients.yuanrong_client.TORCH_NPU_IMPORTED", False) return mock_client From 2fc6f570e751ff8d50c0a874b8548c639ab1400b Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Mon, 26 Jan 2026 16:28:24 +0800 Subject: [PATCH 3/3] CI add install yr Signed-off-by: Evelynn-V --- .github/workflows/python-package.yml | 2 +- tests/test_yuanrong_storage_manager.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index cadce432..bd74cff3 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -31,7 +31,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest build pytest_asyncio pytest-mock + python -m pip install flake8 pytest build pytest_asyncio pytest-mock openyuanrong-datasystem python -m build --wheel pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu pip install dist/*.whl diff --git a/tests/test_yuanrong_storage_manager.py b/tests/test_yuanrong_storage_manager.py index 934dcfca..4c38b7cc 100644 --- a/tests/test_yuanrong_storage_manager.py +++ b/tests/test_yuanrong_storage_manager.py @@ -43,8 +43,8 @@ def mock_kv_client(self, mocker): mock_client = MagicMock() mock_client.init.return_value = None - mocker.patch("transfer_queue.storage.clients.yuanrong_client.datasystem.KVClient", return_value=mock_client) - mocker.patch("transfer_queue.storage.clients.yuanrong_client.datasystem.DsTensorClient") + mocker.patch("yr.datasystem.KVClient", return_value=mock_client) + mocker.patch("yr.datasystem.DsTensorClient") mocker.patch("transfer_queue.storage.clients.yuanrong_client.TORCH_NPU_IMPORTED", False) return mock_client