diff --git a/pyproject.toml b/pyproject.toml index 0ee9aa6..2042f91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,3 +10,8 @@ build-backend = "setuptools.build_meta" [tool.ruff] select = ["E", "F"] ignore = ["E731", "E501"] + +[tool.pytest.ini_options] +markers = [ + "slow: marks tests as slow (deselect with '-m \"not slow\"')", +] diff --git a/tests/test_vcd_sparse.py b/tests/test_vcd_sparse.py new file mode 100644 index 0000000..fc7bd53 --- /dev/null +++ b/tests/test_vcd_sparse.py @@ -0,0 +1,151 @@ +'''Tests for the sparse VCD parser.''' +import pytest +from pathlib import Path + +from wal.trace.container import TraceContainer +from wal.trace.vcd import TraceVcd +from wal.trace.vcd_sparse import TraceVcdSparse + + +TRACES_DIR = Path(__file__).parent / 'traces' + + +class TestVcdSparseBasic: + '''Basic functionality tests for sparse VCD parser.''' + + def test_load_sparse_vcd(self): + '''Test that sparse VCD loads without errors.''' + tc = TraceContainer() + tc.load(str(TRACES_DIR / 'counter.vcd'), tid='test', sparse=True) + assert 'test' in tc.traces + assert isinstance(tc.traces['test'], TraceVcdSparse) + + def test_load_dense_vcd(self): + '''Test that dense VCD still works.''' + tc = TraceContainer() + tc.load(str(TRACES_DIR / 'counter.vcd'), tid='test', sparse=False) + assert 'test' in tc.traces + assert isinstance(tc.traces['test'], TraceVcd) + + def test_signals_match(self): + '''Test that both parsers find the same signals.''' + tc_dense = TraceContainer() + tc_dense.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=False) + + tc_sparse = TraceContainer() + tc_sparse.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=True) + + assert tc_dense.signals == tc_sparse.signals + + def test_scopes_match(self): + '''Test that both parsers find the same scopes.''' + tc_dense = TraceContainer() + tc_dense.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=False) + + tc_sparse = TraceContainer() + tc_sparse.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=True) + + assert tc_dense.scopes == tc_sparse.scopes + + +class TestVcdSparseValues: + '''Test that sparse parser returns correct values.''' + + def test_values_match_at_all_indices(self): + '''Test that both parsers return same values at all indices.''' + tc_dense = TraceContainer() + tc_dense.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=False) + trace_dense = tc_dense.traces['t'] + + tc_sparse = TraceContainer() + tc_sparse.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=True) + trace_sparse = tc_sparse.traces['t'] + + # Get all signal names (excluding special signals) + signals = [s for s in trace_dense.rawsignals] + + # Compare values at all indices + for idx in range(trace_dense.max_index + 1): + for signal in signals: + dense_val = trace_dense.access_signal_data(signal, idx) + sparse_val = trace_sparse.access_signal_data(signal, idx) + assert dense_val == sparse_val, \ + f"Mismatch at index {idx}, signal {signal}: dense={dense_val}, sparse={sparse_val}" + + def test_signal_width(self): + '''Test that signal width is correctly reported.''' + tc_sparse = TraceContainer() + tc_sparse.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=True) + trace = tc_sparse.traces['t'] + + # The counter signal should be 4 bits wide + assert trace.signal_width('tb.dut.counter') == 4 + + +class TestVcdSparseMemory: + '''Test memory efficiency of sparse parser.''' + + def test_memory_stats(self): + '''Test that memory_stats returns reasonable values.''' + tc = TraceContainer() + tc.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=True) + trace = tc.traces['t'] + + stats = trace.memory_stats() + + assert stats['n_signals'] > 0 + assert stats['n_timestamps'] > 0 + assert stats['total_changes'] > 0 + assert stats['dense_equivalent'] >= stats['total_changes'] + assert stats['compression_ratio'] >= 1.0 + + def test_sparse_uses_less_entries(self): + '''Test that sparse storage uses fewer entries than dense.''' + tc = TraceContainer() + tc.load(str(TRACES_DIR / 'counter.vcd'), tid='t', sparse=True) + trace = tc.traces['t'] + + stats = trace.memory_stats() + + # For a typical VCD, sparse should use significantly fewer entries + # The counter.vcd has signals that don't change at every timestamp + assert stats['compression_ratio'] > 1.0, \ + f"Expected compression ratio > 1, got {stats['compression_ratio']}" + + +class TestVcdSparseFromString: + '''Test loading VCD from string.''' + + def test_from_string(self): + '''Test loading VCD content from string.''' + vcd_content = """$timescale 1ns $end +$scope module top $end +$var wire 1 ! clk $end +$var wire 8 @ data [7:0] $end +$upscope $end +$enddefinitions $end +#0 +0! +b00000000 @ +#10 +1! +#20 +0! +b00001111 @ +#30 +1! +#40 +0! +""" + # Use TraceVcdSparse directly since container.load has issues with from_string + tc = TraceContainer() + trace = TraceVcdSparse(vcd_content, 't', tc, from_string=True) + + assert 'top.clk' in trace.rawsignals + assert 'top.data' in trace.rawsignals + + # Check values + assert trace.access_signal_data('top.clk', 0) == 0 + assert trace.access_signal_data('top.clk', 1) == 1 + assert trace.access_signal_data('top.data', 0) == 0 + assert trace.access_signal_data('top.data', 2) == 15 # 0b00001111 = 15 diff --git a/tests/test_vcd_sparse_comparison.py b/tests/test_vcd_sparse_comparison.py new file mode 100644 index 0000000..02f52e8 --- /dev/null +++ b/tests/test_vcd_sparse_comparison.py @@ -0,0 +1,457 @@ +'''Comparison tests between dense and sparse VCD parsers. + +These tests generate large synthetic VCD files to verify correctness +and measure memory efficiency of the sparse parser. +''' +import random +import pytest +from io import StringIO + +from wal.trace.container import TraceContainer +from wal.trace.vcd import TraceVcd +from wal.trace.vcd_sparse import TraceVcdSparse + + +def generate_vcd(n_signals, n_timestamps, change_probability=0.1, seed=42): + """Generate a synthetic VCD string. + + Args: + n_signals: Number of signals to generate + n_timestamps: Number of timestamps + change_probability: Probability that a signal changes at each timestamp + seed: Random seed for reproducibility + + Returns: + VCD content as string + """ + random.seed(seed) + + lines = [] + + # Header + lines.append("$timescale 1ns $end") + lines.append("$scope module top $end") + + # Generate signal definitions + # Mix of 1-bit and multi-bit signals + signal_ids = [] + signal_widths = [] + for i in range(n_signals): + sig_id = f"s{i}" + signal_ids.append(sig_id) + + # 70% 1-bit, 20% 8-bit, 10% 32-bit + r = random.random() + if r < 0.7: + width = 1 + elif r < 0.9: + width = 8 + else: + width = 32 + signal_widths.append(width) + + lines.append(f"$var wire {width} {sig_id} sig{i} $end") + + lines.append("$upscope $end") + lines.append("$enddefinitions $end") + + # Initial values at time 0 + lines.append("#0") + lines.append("$dumpvars") + + current_values = [] + for i, (sig_id, width) in enumerate(zip(signal_ids, signal_widths)): + if width == 1: + val = random.choice([0, 1]) + current_values.append(val) + lines.append(f"{val}{sig_id}") + else: + val = random.randint(0, (1 << width) - 1) + current_values.append(val) + lines.append(f"b{val:0{width}b} {sig_id}") + + lines.append("$end") + + # Generate timestamps with value changes + for ts in range(1, n_timestamps): + changes_at_ts = [] + + for i, (sig_id, width) in enumerate(zip(signal_ids, signal_widths)): + if random.random() < change_probability: + if width == 1: + # Toggle + new_val = 1 - current_values[i] + current_values[i] = new_val + changes_at_ts.append(f"{new_val}{sig_id}") + else: + new_val = random.randint(0, (1 << width) - 1) + current_values[i] = new_val + changes_at_ts.append(f"b{new_val:0{width}b} {sig_id}") + + # Only emit timestamp if there are changes + if changes_at_ts: + lines.append(f"#{ts * 10}") # Timestamps at 10ns intervals + lines.extend(changes_at_ts) + + return "\n".join(lines) + + +class TestVcdComparisonSmall: + """Small-scale comparison tests for quick validation.""" + + def test_small_simulation_values_match(self): + """Test that both parsers return identical values for small simulation.""" + vcd = generate_vcd(n_signals=10, n_timestamps=100, change_probability=0.2) + + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'dense', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 'sparse', tc_sparse, from_string=True) + + # Compare all signals at all indices + for signal in trace_dense.rawsignals: + for idx in range(trace_dense.max_index + 1): + dense_val = trace_dense.access_signal_data(signal, idx) + sparse_val = trace_sparse.access_signal_data(signal, idx) + assert dense_val == sparse_val, \ + f"Mismatch at {signal}[{idx}]: dense={dense_val}, sparse={sparse_val}" + + def test_signals_and_scopes_match(self): + """Test that signal and scope lists are identical.""" + vcd = generate_vcd(n_signals=20, n_timestamps=50) + + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'dense', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 'sparse', tc_sparse, from_string=True) + + assert set(trace_dense.rawsignals) == set(trace_sparse.rawsignals) + assert trace_dense.scopes == trace_sparse.scopes + assert trace_dense.max_index == trace_sparse.max_index + + +class TestVcdComparisonMedium: + """Medium-scale tests with more signals and timestamps.""" + + def test_medium_simulation_100_signals_1000_timestamps(self): + """Test 100 signals over 1000 timestamps.""" + vcd = generate_vcd(n_signals=100, n_timestamps=1000, change_probability=0.05) + + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'dense', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 'sparse', tc_sparse, from_string=True) + + # Sample check at various indices + test_indices = [0, 1, 10, 100, 500, 999] + for signal in trace_dense.rawsignals: + for idx in test_indices: + if idx <= trace_dense.max_index: + dense_val = trace_dense.access_signal_data(signal, idx) + sparse_val = trace_sparse.access_signal_data(signal, idx) + assert dense_val == sparse_val, \ + f"Mismatch at {signal}[{idx}]: dense={dense_val}, sparse={sparse_val}" + + def test_sparse_compression_ratio(self): + """Test that sparse parser achieves good compression with low change rate.""" + # Low change probability = high compression + vcd = generate_vcd(n_signals=100, n_timestamps=1000, change_probability=0.01) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 'sparse', tc_sparse, from_string=True) + + stats = trace_sparse.memory_stats() + + # With 1% change rate, we expect significant compression + assert stats['compression_ratio'] > 5.0, \ + f"Expected compression ratio > 5, got {stats['compression_ratio']:.2f}" + + print(f"\nCompression stats (100 signals, 1000 timestamps, 1% change rate):") + print(f" Signals: {stats['n_signals']}") + print(f" Timestamps: {stats['n_timestamps']}") + print(f" Total changes: {stats['total_changes']}") + print(f" Dense equivalent: {stats['dense_equivalent']}") + print(f" Compression ratio: {stats['compression_ratio']:.2f}x") + + +class TestVcdComparisonLarge: + """Large-scale tests for stress testing.""" + + @pytest.mark.slow + def test_large_simulation_500_signals_10000_timestamps(self): + """Test 500 signals over 10000 timestamps (5M dense entries).""" + vcd = generate_vcd(n_signals=500, n_timestamps=10000, change_probability=0.02) + + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'dense', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 'sparse', tc_sparse, from_string=True) + + # Verify max_index matches + assert trace_dense.max_index == trace_sparse.max_index + + # Sample random indices and signals for comparison + random.seed(123) + test_signals = random.sample(trace_dense.rawsignals, min(50, len(trace_dense.rawsignals))) + test_indices = random.sample(range(trace_dense.max_index + 1), min(100, trace_dense.max_index + 1)) + + for signal in test_signals: + for idx in test_indices: + dense_val = trace_dense.access_signal_data(signal, idx) + sparse_val = trace_sparse.access_signal_data(signal, idx) + assert dense_val == sparse_val, \ + f"Mismatch at {signal}[{idx}]: dense={dense_val}, sparse={sparse_val}" + + # Report compression + stats = trace_sparse.memory_stats() + print(f"\nLarge simulation stats (500 signals, 10000 timestamps):") + print(f" Total changes: {stats['total_changes']:,}") + print(f" Dense equivalent: {stats['dense_equivalent']:,}") + print(f" Compression ratio: {stats['compression_ratio']:.2f}x") + + @pytest.mark.slow + def test_very_large_1000_signals_50000_timestamps(self): + """Test 1000 signals over 50000 timestamps (50M dense entries).""" + vcd = generate_vcd(n_signals=1000, n_timestamps=50000, change_probability=0.005) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 'sparse', tc_sparse, from_string=True) + + stats = trace_sparse.memory_stats() + + print(f"\nVery large simulation stats (1000 signals, 50000 timestamps):") + print(f" Total changes: {stats['total_changes']:,}") + print(f" Dense equivalent: {stats['dense_equivalent']:,}") + print(f" Compression ratio: {stats['compression_ratio']:.2f}x") + print(f" Memory savings: {(1 - 1/stats['compression_ratio']) * 100:.1f}%") + + # With 0.5% change rate, expect very high compression + assert stats['compression_ratio'] > 10.0, \ + f"Expected compression ratio > 10, got {stats['compression_ratio']:.2f}" + + # Spot check some values + random.seed(456) + test_signals = random.sample(trace_sparse.rawsignals, 20) + test_indices = [0, 100, 1000, 10000, 25000, trace_sparse.max_index] + + for signal in test_signals: + for idx in test_indices: + if idx <= trace_sparse.max_index: + # Just verify we can access without error + val = trace_sparse.access_signal_data(signal, idx) + assert val is not None + + +class TestVcdEdgeCases: + """Test edge cases and special scenarios.""" + + def test_signal_never_changes(self): + """Test signal that never changes after initial value.""" + vcd = """$timescale 1ns $end +$scope module top $end +$var wire 1 ! clk $end +$var wire 8 @ static_data $end +$upscope $end +$enddefinitions $end +#0 +$dumpvars +0! +b10101010 @ +$end +#10 +1! +#20 +0! +#30 +1! +#40 +0! +#50 +1! +""" + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'd', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 's', tc_sparse, from_string=True) + + # static_data should have same value at all timestamps + for idx in range(trace_dense.max_index + 1): + dense_val = trace_dense.access_signal_data('top.static_data', idx) + sparse_val = trace_sparse.access_signal_data('top.static_data', idx) + assert dense_val == sparse_val == 170 # 0b10101010 = 170 + + def test_signal_changes_every_timestamp(self): + """Test signal that changes at every timestamp (worst case for sparse).""" + vcd = generate_vcd(n_signals=10, n_timestamps=100, change_probability=1.0) + + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'd', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 's', tc_sparse, from_string=True) + + # Values should still match + for signal in trace_dense.rawsignals: + for idx in range(trace_dense.max_index + 1): + dense_val = trace_dense.access_signal_data(signal, idx) + sparse_val = trace_sparse.access_signal_data(signal, idx) + assert dense_val == sparse_val + + # Compression should be ~1x (no benefit, but no penalty either) + stats = trace_sparse.memory_stats() + assert stats['compression_ratio'] >= 0.9 # Allow small overhead + + def test_x_and_z_values(self): + """Test handling of X and Z values on single-bit signals. + + Note: Multi-bit signals with x/z (like bxxxx or bxx10) are a known + limitation in TraceVcd - it fails to parse them. This test only + covers single-bit x/z values which work correctly. + """ + vcd = """$timescale 1ns $end +$scope module top $end +$var wire 1 ! flag $end +$var wire 1 @ enable $end +$upscope $end +$enddefinitions $end +#0 +$dumpvars +x! +z@ +$end +#10 +1! +#20 +0! +1@ +#30 +z! +0@ +""" + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'd', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 's', tc_sparse, from_string=True) + + for signal in ['top.flag', 'top.enable']: + for idx in range(trace_dense.max_index + 1): + dense_val = trace_dense.access_signal_data(signal, idx) + sparse_val = trace_sparse.access_signal_data(signal, idx) + assert dense_val == sparse_val, f"Mismatch at {signal}[{idx}]" + + def test_real_valued_signals(self): + """Test handling of real (floating point) signals.""" + vcd = """$timescale 1ns $end +$scope module top $end +$var real 64 ! temperature $end +$upscope $end +$enddefinitions $end +#0 +$dumpvars +r25.5 ! +$end +#10 +r26.7 ! +#20 +r25.5 ! +#30 +r100.0 ! +""" + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'd', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 's', tc_sparse, from_string=True) + + for idx in range(trace_dense.max_index + 1): + dense_val = trace_dense.access_signal_data('top.temperature', idx) + sparse_val = trace_sparse.access_signal_data('top.temperature', idx) + assert dense_val == sparse_val, f"Mismatch at idx {idx}: {dense_val} vs {sparse_val}" + + +class TestVcdComparisonBenchmark: + """Benchmark tests to measure performance difference.""" + + @pytest.mark.slow + def test_access_performance(self): + """Measure access time for both parsers.""" + import time + + vcd = generate_vcd(n_signals=200, n_timestamps=5000, change_probability=0.02) + + # Load both + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'd', tc_dense, from_string=True) + + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 's', tc_sparse, from_string=True) + + n_accesses = 10000 + random.seed(789) + + # Generate random access pattern + signals = trace_dense.rawsignals + accesses = [(random.choice(signals), random.randint(0, trace_dense.max_index)) + for _ in range(n_accesses)] + + # Benchmark dense + start = time.perf_counter() + for sig, idx in accesses: + trace_dense.access_signal_data(sig, idx) + dense_time = time.perf_counter() - start + + # Benchmark sparse + start = time.perf_counter() + for sig, idx in accesses: + trace_sparse.access_signal_data(sig, idx) + sparse_time = time.perf_counter() - start + + print(f"\nAccess performance ({n_accesses:,} random accesses):") + print(f" Dense: {dense_time*1000:.2f} ms ({n_accesses/dense_time:.0f} accesses/sec)") + print(f" Sparse: {sparse_time*1000:.2f} ms ({n_accesses/sparse_time:.0f} accesses/sec)") + print(f" Ratio: {sparse_time/dense_time:.2f}x") + + stats = trace_sparse.memory_stats() + print(f" Memory compression: {stats['compression_ratio']:.2f}x") + + +if __name__ == '__main__': + # Run a quick sanity check + print("Generating test VCD...") + vcd = generate_vcd(n_signals=100, n_timestamps=1000, change_probability=0.02) + print(f"VCD size: {len(vcd):,} bytes") + + print("\nLoading with dense parser...") + tc_dense = TraceContainer() + trace_dense = TraceVcd(vcd, 'd', tc_dense, from_string=True) + + print("Loading with sparse parser...") + tc_sparse = TraceContainer() + trace_sparse = TraceVcdSparse(vcd, 's', tc_sparse, from_string=True) + + print("\nComparing values...") + mismatches = 0 + for signal in trace_dense.rawsignals[:10]: # Check first 10 signals + for idx in range(min(100, trace_dense.max_index + 1)): + d = trace_dense.access_signal_data(signal, idx) + s = trace_sparse.access_signal_data(signal, idx) + if d != s: + mismatches += 1 + print(f" MISMATCH: {signal}[{idx}] dense={d} sparse={s}") + + if mismatches == 0: + print(" All values match!") + + stats = trace_sparse.memory_stats() + print(f"\nMemory stats:") + print(f" Signals: {stats['n_signals']}") + print(f" Timestamps: {stats['n_timestamps']}") + print(f" Total changes: {stats['total_changes']:,}") + print(f" Dense equivalent: {stats['dense_equivalent']:,}") + print(f" Compression ratio: {stats['compression_ratio']:.2f}x") diff --git a/wal/trace/container.py b/wal/trace/container.py index 4040214..37c60c1 100644 --- a/wal/trace/container.py +++ b/wal/trace/container.py @@ -5,6 +5,7 @@ from wal.ast_defs import VirtualSignal from wal.trace.trace import Trace from wal.trace.vcd import TraceVcd +from wal.trace.vcd_sparse import TraceVcdSparse from wal.trace.csvtrace import TraceCsv class TraceContainer: @@ -16,8 +17,16 @@ def __init__(self): self.index_stack = [] - def load(self, file, tid=None, from_string=False, keep_signals=None): - '''Load a trace from file and add it under trace id tid.''' + def load(self, file, tid=None, from_string=False, keep_signals=None, sparse=False): + '''Load a trace from file and add it under trace id tid. + + Args: + file: Path to trace file + tid: Trace identifier (auto-generated if None) + from_string: If True, treat file as VCD string content + keep_signals: Optional set of signal names to load (None = all) + sparse: If True, use memory-efficient sparse storage for VCD files + ''' if tid is None: tid = f't{len(self.traces)}' @@ -25,7 +34,10 @@ def load(self, file, tid=None, from_string=False, keep_signals=None): file_extension = pathlib.Path(file).suffix if file_extension == '.vcd': - self.traces[tid] = TraceVcd(file, tid, self, from_string=from_string, keep_signals=keep_signals) + if sparse: + self.traces[tid] = TraceVcdSparse(file, tid, self, from_string=from_string, keep_signals=keep_signals) + else: + self.traces[tid] = TraceVcd(file, tid, self, from_string=from_string, keep_signals=keep_signals) elif file_extension == '.fst': try: from wal.trace.fst import TraceFst diff --git a/wal/trace/vcd_sparse.py b/wal/trace/vcd_sparse.py new file mode 100644 index 0000000..a74a2ec --- /dev/null +++ b/wal/trace/vcd_sparse.py @@ -0,0 +1,268 @@ +'''Memory-efficient VCD trace implementation using sparse storage. + +This parser stores only value changes rather than expanding to a dense +representation. For simulations with many signals that change infrequently, +this can reduce memory usage by orders of magnitude. +''' +import bisect +import re +import sys + +from wal.trace.trace import Trace + + +class TraceVcdSparse(Trace): + '''Memory-efficient VCD trace using sparse storage. + + Instead of storing a value for every signal at every timestamp, + this implementation stores only the actual value changes and uses + binary search to find values at any given index. + ''' + + SKIPPED_COMMANDS_HEADER = set(['$comment', '$version', '$date']) + + def __init__(self, filename, tid, container, from_string=False, keep_signals=None): + super().__init__(tid, filename, container) + self.timestamps = [] + self.lookup = None + self.scopes = [] + self.rawsignals = [] + self.all_ids = set() + self.index2ts = [] + self.name2id = {} + self.signalinfo = {} + self.filename = filename + self.keep_signals = set(keep_signals) if keep_signals else None + + # Sparse storage: {signal_name: (change_indices, values)} + # change_indices[i] is the index at which values[i] becomes active + self.changes = {} + + if from_string: + self.parse(filename) + else: + try: + with open(filename) as f: + self.parse(f.read()) + except FileNotFoundError: + print(f'Error while loading {filename}. File not found.') + sys.exit(1) + + self.all_timestamps = self.timestamps.copy() + self.index = 0 + self.max_index = len(self.index2ts) - 1 + self.signals = set(Trace.SPECIAL_SIGNALS + self.rawsignals) + + self.id2name = {v: k for k, v in self.name2id.items()} + self.rawsignals_by_handle = [self.id2name[s] for s in self.all_ids if s in self.id2name] + self.signals_by_handle = set(self.rawsignals_by_handle) + + def parse(self, vcddata): + '''Parse VCD data into sparse representation.''' + scope = [] + tokens = vcddata.split() + + i = 0 + header_done = False + + # Parse header section (same as TraceVcd) + while (not header_done) and tokens: + if tokens[i] == '$scope': + name = tokens[i + 2] + name = re.sub(r'\[([0-9]+)\]', r'<\1>', name) + name = re.sub(r'\(([0-9]+)\)', r'<\1>', name) + scope.append(name) + self.scopes.append('.'.join(scope)) + i += 4 + elif tokens[i] == '$var': + kind = tokens[i + 1] + width = tokens[i + 2] + sig_id = tokens[i + 3] + name = tokens[i + 4] + + name = re.sub(r'\[[0-9]+:[0-9]+\]', '', name) + name = re.sub(r'\[([0-9]+)\]', r'<\1>', name) + name = re.sub(r'\(([0-9]+)\)', r'<\1>', name) + + if scope: + fullname = '.'.join(scope) + '.' + name + else: + fullname = name + + if not self.keep_signals or (fullname in self.keep_signals): + self.all_ids.add(sig_id) + self.rawsignals.append(fullname) + self.name2id[fullname] = sig_id + self.signalinfo[sig_id] = { + 'id': sig_id, + 'name': fullname, + 'width': int(width), + 'kind': kind, + } + + if tokens[i + 5] == '$end': + i += 6 + elif tokens[i + 5][0] == '[': + i += 7 + else: + assert False, 'VCD error' + elif tokens[i] == '$upscope': + scope.pop() + i += 2 + elif tokens[i] == '$enddefinitions': + i += 2 + header_done = True + elif tokens[i] == '$timescale': + if tokens[i + 3] == '$end': + self.timescale = tokens[i + 1] + tokens[i + 2] + i += 4 + elif tokens[i + 2] == '$end': + self.timescale = tokens[i + 1] + i += 3 + elif tokens[i] in TraceVcdSparse.SKIPPED_COMMANDS_HEADER: + while tokens[i] != '$end': + i += 1 + i += 1 + else: + i += 1 + + # Initialize sparse storage for each signal + # Format: {signal_id: ([change_indices], [values])} + sparse_data = {sig_id: ([], []) for sig_id in self.all_ids} + + # Track current values for change detection + current_values = {sig_id: None for sig_id in self.all_ids} + + # Parse dump section + current_index = -1 # Will be 0 after first timestamp + n_tokens = len(tokens) + SCALARS = ['x', 'z', 'X', 'Z'] + + while i < n_tokens: + first_char = tokens[i][0] + + if first_char == '#': + # New timestamp + time = int(tokens[i][1:]) + self.timestamps.append(time) + self.index2ts.append(time) + current_index += 1 + i += 1 + + elif first_char == '0' or first_char == '1': + # Single-bit value change + sig_id = tokens[i][1:] + if sig_id in self.all_ids: + new_value = 'b' + first_char + if current_values[sig_id] != new_value: + current_values[sig_id] = new_value + indices, values = sparse_data[sig_id] + indices.append(current_index) + values.append(new_value) + i += 1 + + elif first_char == 'b' or first_char == 'r': + # N-bit vector: b0000 id + sig_id = tokens[i + 1] + if sig_id in self.all_ids: + new_value = tokens[i] + if current_values[sig_id] != new_value: + current_values[sig_id] = new_value + indices, values = sparse_data[sig_id] + indices.append(current_index) + values.append(new_value) + i += 2 + + elif first_char in SCALARS: + # Scalar value change (x, z, X, Z) + sig_id = tokens[i][1:] + if sig_id in self.all_ids: + new_value = tokens[i][0] + if current_values[sig_id] != new_value: + current_values[sig_id] = new_value + indices, values = sparse_data[sig_id] + indices.append(current_index) + values.append(new_value) + i += 1 + + elif tokens[i] == '$comment': + while tokens[i] != '$end': + i += 1 + i += 1 + else: + # Skip $dumpvars, $dumpall, $dumpoff, $dumpon, $end + i += 1 + + # Ensure all signals have an initial value at index 0 + for sig_id in self.all_ids: + indices, values = sparse_data[sig_id] + if not indices or indices[0] != 0: + # Insert 'x' at the beginning if no value at index 0 + indices.insert(0, 0) + values.insert(0, 'x') + + # Convert from id-based to name-based storage + self.changes = {} + for signal in self.rawsignals: + sig_id = self.name2id[signal] + self.changes[signal] = sparse_data[sig_id] + + def access_signal_data(self, name, index): + '''Access signal value at given index using binary search.''' + if self.lookup: + index = self.lookup[index] + + indices, values = self.changes[name] + + # Binary search: find rightmost index <= target + pos = bisect.bisect_right(indices, index) - 1 + + if pos < 0: + return 'x' + + value = values[pos] + return self._convert_value(value) + + def _convert_value(self, value): + '''Convert VCD value string to appropriate Python type.''' + if isinstance(value, str): + if value.startswith('b'): + try: + return int(value[1:], 2) + except ValueError: + # Contains x or z + return value + elif value.startswith('r'): + return float(value[1:]) + else: + # Single char like 'x', 'z', '0', '1' + return value + return value + + def set_sampling_points(self, new_indices): + '''Updates the indices at which data is sampled.''' + self.lookup = dict(enumerate(new_indices)) + new_timestamps = [self.all_timestamps[i] for i in new_indices] + self.timestamps = list(dict.fromkeys(new_timestamps)) + self.timestamps = dict(enumerate(self.timestamps)) + self.index = 0 + self.max_index = len(self.timestamps.keys()) - 1 + + def signal_width(self, name): + '''Returns the width of a signal.''' + return self.signalinfo[self.name2id[name]]['width'] + + def memory_stats(self): + '''Return memory usage statistics for debugging.''' + total_changes = sum(len(indices) for indices, _ in self.changes.values()) + n_signals = len(self.changes) + n_timestamps = len(self.index2ts) + dense_size = n_signals * n_timestamps + + return { + 'n_signals': n_signals, + 'n_timestamps': n_timestamps, + 'total_changes': total_changes, + 'dense_equivalent': dense_size, + 'compression_ratio': dense_size / total_changes if total_changes > 0 else 0, + }