diff --git a/csharp/Platform.Memory.Tests/DistributedDirectMemoryTests.cs b/csharp/Platform.Memory.Tests/DistributedDirectMemoryTests.cs new file mode 100644 index 0000000..c47976d --- /dev/null +++ b/csharp/Platform.Memory.Tests/DistributedDirectMemoryTests.cs @@ -0,0 +1,368 @@ +using System; +using System.IO; +using System.Linq; +using Xunit; + +namespace Platform.Memory.Tests +{ + public unsafe class DistributedDirectMemoryTests + { + private readonly string[] _testPaths; + private readonly string _testDirectory; + + public DistributedDirectMemoryTests() + { + _testDirectory = Path.Combine(Path.GetTempPath(), "DistributedMemoryTests", Guid.NewGuid().ToString()); + _testPaths = new[] + { + Path.Combine(_testDirectory, "drive1"), + Path.Combine(_testDirectory, "drive2"), + Path.Combine(_testDirectory, "drive3") + }; + + foreach (var path in _testPaths) + { + Directory.CreateDirectory(path); + } + } + + private void Cleanup() + { + try + { + if (Directory.Exists(_testDirectory)) + { + Directory.Delete(_testDirectory, recursive: true); + } + } + catch + { + // Ignore cleanup errors + } + } + + [Fact] + public void ConstructorCreatesDirectoriesTest() + { + var tempPaths = new[] + { + Path.Combine(Path.GetTempPath(), "DistributedMemoryTest1", Guid.NewGuid().ToString()), + Path.Combine(Path.GetTempPath(), "DistributedMemoryTest2", Guid.NewGuid().ToString()) + }; + + try + { + using var memory = new DistributedDirectMemory(tempPaths, "test"); + + foreach (var path in tempPaths) + { + Assert.True(Directory.Exists(path)); + } + } + finally + { + foreach (var path in tempPaths) + { + try + { + if (Directory.Exists(path)) + { + Directory.Delete(path, recursive: true); + } + } + catch { } + } + } + } + + [Fact] + public void InitialCapacityTest() + { + using var memory = new DistributedDirectMemory(_testPaths, "test"); + + Assert.True(memory.ReservedCapacity >= ResizableDirectMemoryBase.MinimumCapacity); + Assert.Equal(0, memory.UsedCapacity); + Assert.True(memory.Size == memory.UsedCapacity); + + Cleanup(); + } + + [Fact] + public void CapacityResizingTest() + { + using var memory = new DistributedDirectMemory(_testPaths, "test"); + + var initialCapacity = memory.ReservedCapacity; + memory.ReservedCapacity = initialCapacity * 2; + + Assert.Equal(initialCapacity * 2, memory.ReservedCapacity); + Assert.Equal(0, memory.UsedCapacity); + + Cleanup(); + } + + [Fact] + public void BasicWriteAndReadTest() + { + using var memory = new DistributedDirectMemory(_testPaths, "test"); + + var testData = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + var readBuffer = new byte[testData.Length]; + + memory.WriteBytes(0, testData, 0, testData.Length); + memory.ReadBytes(0, readBuffer, 0, readBuffer.Length); + + Assert.Equal(testData, readBuffer); + Assert.Equal(testData.Length, memory.UsedCapacity); + + Cleanup(); + } + + [Fact] + public void CrossSegmentWriteAndReadTest() + { + var segmentSize = ResizableDirectMemoryBase.MinimumCapacity; + using var memory = new DistributedDirectMemory(_testPaths, "test", segmentSize, segmentSize * 2); + + // Create data that spans multiple segments + var testData = new byte[segmentSize + 1000]; + for (int i = 0; i < testData.Length; i++) + { + testData[i] = (byte)(i % 256); + } + + var readBuffer = new byte[testData.Length]; + + memory.WriteBytes(0, testData, 0, testData.Length); + memory.ReadBytes(0, readBuffer, 0, readBuffer.Length); + + Assert.Equal(testData, readBuffer); + Assert.Equal(testData.Length, memory.UsedCapacity); + + Cleanup(); + } + + [Fact] + public void MultipleSegmentDistributionTest() + { + var segmentSize = ResizableDirectMemoryBase.MinimumCapacity; + using var memory = new DistributedDirectMemory(_testPaths, "test", segmentSize, segmentSize * 3); + + // Write data to force creation of multiple segments + var testData = new byte[segmentSize * 2 + 500]; + for (int i = 0; i < testData.Length; i++) + { + testData[i] = (byte)(i % 256); + } + + memory.WriteBytes(0, testData, 0, testData.Length); + + // Verify segments are distributed across different paths + var segmentFiles = _testPaths + .SelectMany(path => Directory.GetFiles(path, "test_segment_*.dat")) + .ToArray(); + + Assert.True(segmentFiles.Length >= 2); + + // Verify files are distributed across different drives + var driveDistribution = segmentFiles + .GroupBy(file => Path.GetDirectoryName(file)) + .Count(); + + Assert.True(driveDistribution > 1); + + Cleanup(); + } + + [Fact] + public void RandomAccessTest() + { + var segmentSize = ResizableDirectMemoryBase.MinimumCapacity; + using var memory = new DistributedDirectMemory(_testPaths, "test", segmentSize, 20000); + + var random = new System.Random(42); + var testData = new byte[10000]; + random.NextBytes(testData); + + // Write the entire test data first to ensure it's all available + memory.WriteBytes(0, testData, 0, testData.Length); + + // Test random access reads + for (int i = 0; i < 50; i++) + { + var offset = random.Next(0, testData.Length - 100); + var length = random.Next(1, Math.Min(100, testData.Length - offset)); + var readBuffer = new byte[length]; + + memory.ReadBytes(offset, readBuffer, 0, length); + + for (int j = 0; j < length; j++) + { + Assert.Equal(testData[offset + j], readBuffer[j]); + } + } + + // Test random overwrites + var random2 = new System.Random(123); + for (int i = 0; i < 20; i++) + { + var offset = random2.Next(0, testData.Length - 50); + var length = random2.Next(1, Math.Min(50, testData.Length - offset)); + var newData = new byte[length]; + random2.NextBytes(newData); + + memory.WriteBytes(offset, newData, 0, length); + + // Update our test data array to match what we wrote + Array.Copy(newData, 0, testData, offset, length); + + // Verify the write + var readBuffer = new byte[length]; + memory.ReadBytes(offset, readBuffer, 0, length); + Assert.Equal(newData, readBuffer); + } + + Cleanup(); + } + + [Fact] + public void GetPointerAtTest() + { + using var memory = new DistributedDirectMemory(_testPaths, "test"); + + var testData = new byte[] { 42, 84, 126 }; + memory.WriteBytes(0, testData, 0, testData.Length); + + for (int i = 0; i < testData.Length; i++) + { + var pointer = memory.GetPointerAt(i); + var value = *(byte*)pointer; + Assert.Equal(testData[i], value); + } + + Cleanup(); + } + + [Fact] + public void CapacityReductionTest() + { + var segmentSize = ResizableDirectMemoryBase.MinimumCapacity; + using var memory = new DistributedDirectMemory(_testPaths, "test", segmentSize, segmentSize * 3); + + var initialCapacity = memory.ReservedCapacity; + + // Write some data to multiple segments + var testData = new byte[segmentSize / 2]; + for (int i = 0; i < testData.Length; i++) + { + testData[i] = (byte)(i % 256); + } + memory.WriteBytes(0, testData, 0, testData.Length); + + // Get initial file count + var initialFiles = _testPaths + .SelectMany(path => Directory.GetFiles(path, "test_segment_*.dat")) + .Count(); + + // Reduce capacity to something larger than used capacity but smaller than original + memory.ReservedCapacity = segmentSize * 2; + + // Verify files are cleaned up + var finalFiles = _testPaths + .SelectMany(path => Directory.GetFiles(path, "test_segment_*.dat")) + .Count(); + + Assert.True(finalFiles <= initialFiles); + Assert.Equal(segmentSize * 2, memory.ReservedCapacity); + + Cleanup(); + } + + [Fact] + public void DisposalCleansUpResourcesTest() + { + var segmentSize = ResizableDirectMemoryBase.MinimumCapacity; + var memory = new DistributedDirectMemory(_testPaths, "disposal_test", segmentSize, segmentSize * 2); + + // Create some segments by writing data + var testData = new byte[segmentSize + 1000]; + memory.WriteBytes(0, testData, 0, testData.Length); + + // Verify segments exist + var filesBeforeDisposal = _testPaths + .SelectMany(path => Directory.GetFiles(path, "disposal_test_segment_*.dat")) + .Count(); + + Assert.True(filesBeforeDisposal > 0); + + memory.Dispose(); + + // Files should still exist after disposal (they contain data) + // but should be properly sized to used capacity + var filesAfterDisposal = _testPaths + .SelectMany(path => Directory.GetFiles(path, "disposal_test_segment_*.dat")) + .Count(); + + // Verify no exception is thrown on subsequent disposal + memory.Dispose(); + + Cleanup(); + } + + [Fact] + public void ArgumentValidationTest() + { + Assert.Throws(() => + new DistributedDirectMemory(null!, "test")); + + Assert.Throws(() => + new DistributedDirectMemory(new string[0], "test")); + + Assert.Throws(() => + new DistributedDirectMemory(new[] { "" }, "test")); + + Assert.Throws(() => + new DistributedDirectMemory(_testPaths, "")); + + using var memory = new DistributedDirectMemory(_testPaths, "test"); + + Assert.Throws(() => + memory.ReadBytes(0, null!, 0, 10)); + + Assert.Throws(() => + memory.WriteBytes(0, null!, 0, 10)); + + Cleanup(); + } + + [Fact] + public void LargeDataTest() + { + var segmentSize = ResizableDirectMemoryBase.MinimumCapacity; + using var memory = new DistributedDirectMemory(_testPaths, "large_test", segmentSize, segmentSize * 5); + + // Create data larger than single segment + var dataSize = segmentSize * 3 + 12345; + var largeData = new byte[dataSize]; + + // Ensure capacity is large enough + if (memory.ReservedCapacity < dataSize) + { + memory.ReservedCapacity = dataSize; + } + + var random = new System.Random(123); + random.NextBytes(largeData); + + memory.WriteBytes(0, largeData, 0, largeData.Length); + + var readBuffer = new byte[dataSize]; + memory.ReadBytes(0, readBuffer, 0, (int)dataSize); + + Assert.Equal(largeData, readBuffer); + Assert.Equal(dataSize, memory.UsedCapacity); + + Cleanup(); + } + } +} \ No newline at end of file diff --git a/csharp/Platform.Memory/DistributedDirectMemory.cs b/csharp/Platform.Memory/DistributedDirectMemory.cs new file mode 100644 index 0000000..44619cc --- /dev/null +++ b/csharp/Platform.Memory/DistributedDirectMemory.cs @@ -0,0 +1,337 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Runtime.CompilerServices; +using Platform.Exceptions; +using Platform.Disposables; +using Platform.Collections; +using Platform.Ranges; + +namespace Platform.Memory +{ + /// + /// Represents a memory block distributed across multiple storage locations (drives or machines). + /// Представляет блок памяти, распределённый по нескольким местам хранения (дискам или машинам). + /// + public unsafe class DistributedDirectMemory : ResizableDirectMemoryBase + { + #region Fields + private readonly List _segments; + private readonly List _segmentPaths; + private readonly string[] _basePaths; + private readonly long _segmentSize; + private readonly string _namePrefix; + + #endregion + + #region DisposableBase Properties + + /// + protected override string ObjectName + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => $"Distributed memory block across {_basePaths.Length} locations with {_segments.Count} segments."; + } + + #endregion + + #region Constructors + + /// + /// Initializes a new instance of the class. + /// Инициализирует новый экземпляр класса . + /// + /// Array of base directory paths where memory segments will be stored.Массив базовых путей к директориям, где будут храниться сегменты памяти. + /// Prefix for segment file names.Префикс для имён файлов сегментов. + /// Size of each memory segment in bytes.Размер каждого сегмента памяти в байтах. + /// Minimum reserved capacity in bytes.Минимальный зарезервированный размер в байтах. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public DistributedDirectMemory(string[] basePaths, string namePrefix, long segmentSize, long minimumReservedCapacity) + { + Ensure.Always.ArgumentNotNull(basePaths, nameof(basePaths)); + Ensure.Always.ArgumentNotEmptyAndNotWhiteSpace(namePrefix, nameof(namePrefix)); + Ensure.Always.ArgumentInRange(basePaths.Length, new Range(1, int.MaxValue), nameof(basePaths)); + + if (minimumReservedCapacity < MinimumCapacity) + { + minimumReservedCapacity = MinimumCapacity; + } + if (segmentSize < MinimumCapacity) + { + segmentSize = MinimumCapacity; + } + + // Ensure all base paths exist + foreach (var basePath in basePaths) + { + Ensure.Always.ArgumentNotEmptyAndNotWhiteSpace(basePath, nameof(basePaths)); + Directory.CreateDirectory(basePath); + } + + _basePaths = (string[])basePaths.Clone(); + _namePrefix = namePrefix; + _segmentSize = segmentSize; + _segments = new List(); + _segmentPaths = new List(); + + ReservedCapacity = minimumReservedCapacity; + UsedCapacity = 0; + } + + /// + /// Initializes a new instance of the class. + /// Инициализирует новый экземпляр класса . + /// + /// Array of base directory paths where memory segments will be stored.Массив базовых путей к директориям, где будут храниться сегменты памяти. + /// Prefix for segment file names.Префикс для имён файлов сегментов. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public DistributedDirectMemory(string[] basePaths, string namePrefix) + : this(basePaths, namePrefix, MinimumCapacity * 16, MinimumCapacity) { } + + #endregion + + #region Methods + + /// + /// Gets the segment index and offset within segment for the given absolute offset. + /// Получает индекс сегмента и смещение внутри сегмента для заданного абсолютного смещения. + /// + /// Absolute offset in bytes.Абсолютное смещение в байтах. + /// Index of the segment.Индекс сегмента. + /// Offset within the segment.Смещение внутри сегмента. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void GetSegmentLocation(long absoluteOffset, out int segmentIndex, out long segmentOffset) + { + segmentIndex = (int)(absoluteOffset / _segmentSize); + segmentOffset = absoluteOffset % _segmentSize; + } + + /// + /// Ensures that the specified segment exists. + /// Обеспечивает существование указанного сегмента. + /// + /// Index of the segment to ensure.Индекс сегмента для обеспечения. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void EnsureSegment(int segmentIndex) + { + while (_segments.Count <= segmentIndex) + { + var pathIndex = _segments.Count % _basePaths.Length; + var segmentFileName = $"{_namePrefix}_segment_{_segments.Count:D8}.dat"; + var segmentPath = Path.Combine(_basePaths[pathIndex], segmentFileName); + + var segment = new FileMappedResizableDirectMemory(segmentPath, _segmentSize); + _segments.Add(segment); + _segmentPaths.Add(segmentPath); + } + } + + /// + /// Gets a pointer to the specified absolute offset in the distributed memory. + /// Получает указатель на указанное абсолютное смещение в распределённой памяти. + /// + /// Absolute offset in bytes.Абсолютное смещение в байтах. + /// Pointer to the memory location.Указатель на место в памяти. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public IntPtr GetPointerAt(long absoluteOffset) + { + Ensure.Always.NotDisposed(this); + Ensure.Always.ArgumentInRange(absoluteOffset, new Range(0, ReservedCapacity - 1)); + + GetSegmentLocation(absoluteOffset, out int segmentIndex, out long segmentOffset); + EnsureSegment(segmentIndex); + + return new IntPtr(_segments[segmentIndex].Pointer.ToInt64() + segmentOffset); + } + + /// + /// Reads data from the distributed memory at the specified offset. + /// Читает данные из распределённой памяти по указанному смещению. + /// + /// Absolute offset to read from.Абсолютное смещение для чтения. + /// Buffer to read data into.Буфер для чтения данных. + /// Offset in the buffer to start writing.Смещение в буфере для начала записи. + /// Number of bytes to read.Количество байт для чтения. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ReadBytes(long absoluteOffset, byte[] buffer, int bufferOffset, int count) + { + Ensure.Always.NotDisposed(this); + Ensure.Always.ArgumentNotNull(buffer, nameof(buffer)); + Ensure.Always.ArgumentInRange(absoluteOffset, new Range(0, UsedCapacity - count)); + Ensure.Always.ArgumentInRange(bufferOffset, new Range(0, buffer.Length - count)); + + var remainingBytes = count; + var currentOffset = absoluteOffset; + var currentBufferOffset = bufferOffset; + + while (remainingBytes > 0) + { + GetSegmentLocation(currentOffset, out int segmentIndex, out long segmentOffset); + EnsureSegment(segmentIndex); + + var bytesToReadFromSegment = Math.Min(remainingBytes, (int)(_segmentSize - segmentOffset)); + var sourcePtr = (byte*)(_segments[segmentIndex].Pointer.ToInt64() + segmentOffset); + + fixed (byte* destPtr = &buffer[currentBufferOffset]) + { + for (int i = 0; i < bytesToReadFromSegment; i++) + { + destPtr[i] = sourcePtr[i]; + } + } + + remainingBytes -= bytesToReadFromSegment; + currentOffset += bytesToReadFromSegment; + currentBufferOffset += bytesToReadFromSegment; + } + } + + /// + /// Writes data to the distributed memory at the specified offset. + /// Записывает данные в распределённую память по указанному смещению. + /// + /// Absolute offset to write to.Абсолютное смещение для записи. + /// Buffer containing data to write.Буфер, содержащий данные для записи. + /// Offset in the buffer to start reading.Смещение в буфере для начала чтения. + /// Number of bytes to write.Количество байт для записи. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void WriteBytes(long absoluteOffset, byte[] buffer, int bufferOffset, int count) + { + Ensure.Always.NotDisposed(this); + Ensure.Always.ArgumentNotNull(buffer, nameof(buffer)); + Ensure.Always.ArgumentInRange(absoluteOffset, new Range(0, ReservedCapacity - count)); + Ensure.Always.ArgumentInRange(bufferOffset, new Range(0, buffer.Length - count)); + + var remainingBytes = count; + var currentOffset = absoluteOffset; + var currentBufferOffset = bufferOffset; + + while (remainingBytes > 0) + { + GetSegmentLocation(currentOffset, out int segmentIndex, out long segmentOffset); + EnsureSegment(segmentIndex); + + var bytesToWriteToSegment = Math.Min(remainingBytes, (int)(_segmentSize - segmentOffset)); + var destPtr = (byte*)(_segments[segmentIndex].Pointer.ToInt64() + segmentOffset); + + fixed (byte* sourcePtr = &buffer[currentBufferOffset]) + { + for (int i = 0; i < bytesToWriteToSegment; i++) + { + destPtr[i] = sourcePtr[i]; + } + } + + remainingBytes -= bytesToWriteToSegment; + currentOffset += bytesToWriteToSegment; + currentBufferOffset += bytesToWriteToSegment; + } + + // Update used capacity if we wrote beyond current used capacity + var newUsedCapacity = Math.Max(UsedCapacity, absoluteOffset + count); + if (newUsedCapacity > UsedCapacity) + { + UsedCapacity = newUsedCapacity; + } + } + + #endregion + + #region ResizableDirectMemoryBase Methods + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected override void OnReservedCapacityChanged(long oldReservedCapacity, long newReservedCapacity) + { + var requiredSegments = (int)((newReservedCapacity + _segmentSize - 1) / _segmentSize); + + // Ensure we have enough segments + EnsureSegment(requiredSegments - 1); + + // Resize the last segment if necessary + if (requiredSegments > 0) + { + var lastSegmentIndex = requiredSegments - 1; + var lastSegmentSize = newReservedCapacity - (lastSegmentIndex * _segmentSize); + if (lastSegmentSize != _segmentSize && lastSegmentIndex < _segments.Count) + { + _segments[lastSegmentIndex].ReservedCapacity = Math.Max(lastSegmentSize, MinimumCapacity); + } + } + + // Remove excess segments if shrinking + while (_segments.Count > requiredSegments) + { + var lastIndex = _segments.Count - 1; + _segments[lastIndex].Dispose(); + _segments.RemoveAt(lastIndex); + + // Clean up segment file + var segmentPath = _segmentPaths[lastIndex]; + _segmentPaths.RemoveAt(lastIndex); + try + { + if (File.Exists(segmentPath)) + { + File.Delete(segmentPath); + } + } + catch + { + // Ignore cleanup errors + } + } + + // Update pointer to first segment if available + if (_segments.Count > 0) + { + Pointer = _segments[0].Pointer; + } + else + { + Pointer = IntPtr.Zero; + } + } + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected override void DisposePointer(IntPtr pointer, long usedCapacity) + { + // Dispose all segments + foreach (var segment in _segments) + { + try + { + segment?.Dispose(); + } + catch + { + // Ignore disposal errors + } + } + _segments.Clear(); + + // Clean up segment files if they're empty or not needed + for (int i = 0; i < _segmentPaths.Count; i++) + { + try + { + var segmentPath = _segmentPaths[i]; + var segmentUsedCapacity = Math.Max(0, Math.Min(_segmentSize, usedCapacity - (i * _segmentSize))); + + if (segmentUsedCapacity == 0 && File.Exists(segmentPath)) + { + File.Delete(segmentPath); + } + } + catch + { + // Ignore cleanup errors + } + } + _segmentPaths.Clear(); + } + + #endregion + } +} \ No newline at end of file diff --git a/csharp/Platform.Memory/Platform.Memory.csproj b/csharp/Platform.Memory/Platform.Memory.csproj index 6363b22..65811fb 100644 --- a/csharp/Platform.Memory/Platform.Memory.csproj +++ b/csharp/Platform.Memory/Platform.Memory.csproj @@ -4,13 +4,13 @@ LinksPlatform's Platform.Memory Class Library Konstantin Diachenko Platform.Memory - 0.4.1 + 0.5.0 Konstantin Diachenko net8 true Platform.Memory Platform.Memory - LinksPlatform;Memory;ArrayMemory;DirectMemoryAsArrayMemoryAdapter;FileArrayMemory;FileMappedResizableDirectMemory;HeapResizableDirectMemory;IArrayMemory;IDirectMemory;IMemory;IResizableDirectMemory;ResizableDirectMemoryBase;TemporaryFileMappedResizableDirectMemory;MemoryMappedFiles + LinksPlatform;Memory;ArrayMemory;DirectMemoryAsArrayMemoryAdapter;FileArrayMemory;FileMappedResizableDirectMemory;HeapResizableDirectMemory;DistributedDirectMemory;IArrayMemory;IDirectMemory;IMemory;IResizableDirectMemory;ResizableDirectMemoryBase;TemporaryFileMappedResizableDirectMemory;MemoryMappedFiles https://raw.githubusercontent.com/linksplatform/Documentation/18469f4d033ee9a5b7b84caab9c585acab2ac519/doc/Avatar-rainbow-icon-64x64.png https://linksplatform.github.io/Memory Unlicense @@ -24,12 +24,16 @@ true snupkg latest - Update target framework from net7 to net8. + Add DistributedDirectMemory class for multi-drive and multi-machine memory distribution. enable + + + +