From e41abe61e64c5eebc93e03c94585d3b6beeb59fd Mon Sep 17 00:00:00 2001 From: Jackson Kontny Date: Thu, 23 Apr 2026 14:53:20 -0500 Subject: [PATCH] Add --testmon-cov option to generate Cobertura XML coverage reports Reconstructs line-level coverage from testmon's block-level fingerprint data by storing line level data in testmon db Works standalone (reading existing .testmondata) or alongside --testmon. --- compare_cov.py | 255 ++++++++++++++++++++++++++++++++++++++ testmon/cobertura.py | 219 ++++++++++++++++++++++++++++++++ testmon/db.py | 125 ++++++++++++++++++- testmon/pytest_testmon.py | 89 +++++++++++++ testmon/testmon_core.py | 19 ++- 5 files changed, 705 insertions(+), 2 deletions(-) create mode 100644 compare_cov.py create mode 100644 testmon/cobertura.py diff --git a/compare_cov.py b/compare_cov.py new file mode 100644 index 0000000..f4a9a6e --- /dev/null +++ b/compare_cov.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python +""" +Throwaway script: compare pytest-cov vs --testmon-cov output side-by-side. + +Creates a temp project, runs both tools, parses the Cobertura XML, and prints +a per-file / per-line diff so you can see where block-level reconstruction +diverges from real line-level coverage. +""" + +import os +import shutil +import subprocess +import sys +import tempfile +import xml.etree.ElementTree as ET +from pathlib import Path + +# ── sample project ────────────────────────────────────────────────────────── + +SAMPLE_MODULE = """\ +class Calculator: + def add(self, a, b): + return a + b + + def subtract(self, a, b): + return a - b + + def multiply(self, a, b): + return a * b + + def divide(self, a, b): + if b == 0: + raise ValueError("division by zero") + return a / b + + +def greet(name): + if name: + return f"Hello, {name}!" + return "Hello, stranger!" + + +def unused_function(): + x = 1 + y = 2 + return x + y +""" + +SAMPLE_TESTS = """\ +from mymod import Calculator, greet + + +def test_add(): + c = Calculator() + assert c.add(1, 2) == 3 + + +def test_subtract(): + c = Calculator() + assert c.subtract(5, 3) == 2 + + +def test_divide(): + c = Calculator() + assert c.divide(10, 2) == 5.0 + + +def test_divide_by_zero(): + c = Calculator() + try: + c.divide(1, 0) + except ValueError: + pass + + +def test_greet_with_name(): + assert greet("World") == "Hello, World!" + + +def test_greet_without_name(): + assert greet("") == "Hello, stranger!" +""" + +# ── helpers ───────────────────────────────────────────────────────────────── + + +def parse_cobertura(xml_path): + """Return {filename: {line_number: hits}} from a Cobertura XML.""" + tree = ET.parse(xml_path) + root = tree.getroot() + result = {} + for pkg in root.findall(".//package"): + for cls in pkg.findall(".//class"): + fname = cls.get("filename") + lines = {} + for line in cls.findall(".//line"): + lines[int(line.get("number"))] = int(line.get("hits")) + result[fname] = lines + return result + + +def run(cmd, cwd): + print(f" $ {' '.join(cmd)}") + r = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True) + if r.returncode not in (0, 5): # 5 = no tests collected, ok for testmon-cov standalone + print(f" STDOUT:\n{r.stdout}") + print(f" STDERR:\n{r.stderr}") + sys.exit(f"command failed (rc={r.returncode})") + return r + + +def compare(cov_data, tm_data, source_lines): + """Print a side-by-side comparison for one file. Return (match, mismatch) counts.""" + all_lines = sorted(set(cov_data.keys()) | set(tm_data.keys())) + match = mismatch = cov_only = tm_only = 0 + + print(f" {'Line':>5} {'Source':<55} {'pytest-cov':>10} {'testmon-cov':>11} {'':>5}") + print(f" {'─'*5} {'─'*55} {'─'*10} {'─'*11} {'─'*5}") + + for ln in all_lines: + src = source_lines.get(ln, "").rstrip() + if len(src) > 55: + src = src[:52] + "..." + c = cov_data.get(ln) + t = tm_data.get(ln) + + c_str = str(c) if c is not None else "-" + t_str = str(t) if t is not None else "-" + + if c is not None and t is not None: + if (c > 0) == (t > 0): + flag = " ✓" + match += 1 + else: + flag = " ✗ DIFF" + mismatch += 1 + elif c is not None: + flag = " cov-only" + cov_only += 1 + else: + flag = " tm-only" + tm_only += 1 + + print(f" {ln:>5} {src:<55} {c_str:>10} {t_str:>11} {flag}") + + return match, mismatch, cov_only, tm_only + + +# ── main ──────────────────────────────────────────────────────────────────── + + +def main(): + tmpdir = tempfile.mkdtemp(prefix="testmon_cov_compare_") + print(f"Working in: {tmpdir}\n") + + # write sample project + Path(tmpdir, "mymod.py").write_text(SAMPLE_MODULE) + Path(tmpdir, "test_mymod.py").write_text(SAMPLE_TESTS) + + # 1) pytest-cov run + print("═" * 80) + print("Step 1: pytest-cov") + print("═" * 80) + run( + [ + sys.executable, "-m", "pytest", "-q", + "--cov=.", "--cov-report", "xml:cov_report.xml", + "--override-ini=addopts=", + ], + cwd=tmpdir, + ) + cov_xml = os.path.join(tmpdir, "cov_report.xml") + + # 2) testmon collection run + print("\n" + "═" * 80) + print("Step 2: pytest --testmon (collect data)") + print("═" * 80) + # clean any leftover testmon db + db_path = os.path.join(tmpdir, ".testmondata") + if os.path.exists(db_path): + os.remove(db_path) + run( + [ + sys.executable, "-m", "pytest", "-q", + "--testmon", + "--override-ini=addopts=", + ], + cwd=tmpdir, + ) + + # 3) testmon-cov report + print("\n" + "═" * 80) + print("Step 3: pytest --testmon-cov (generate report)") + print("═" * 80) + run( + [ + sys.executable, "-m", "pytest", "-q", + "--testmon-cov=tm_report.xml", + "--override-ini=addopts=", + ], + cwd=tmpdir, + ) + tm_xml = os.path.join(tmpdir, "tm_report.xml") + + # 4) parse and compare + print("\n" + "═" * 80) + print("Comparison") + print("═" * 80) + cov_all = parse_cobertura(cov_xml) + tm_all = parse_cobertura(tm_xml) + + all_files = sorted(set(cov_all.keys()) | set(tm_all.keys())) + + total_match = total_mismatch = total_cov_only = total_tm_only = 0 + + for fname in all_files: + # read source for display + src_path = os.path.join(tmpdir, fname) + if os.path.exists(src_path): + source_lines = { + i + 1: line + for i, line in enumerate(Path(src_path).read_text().splitlines()) + } + else: + source_lines = {} + + cov_data = cov_all.get(fname, {}) + tm_data = tm_all.get(fname, {}) + + print(f"\n┌── {fname} ──") + m, mm, co, to = compare(cov_data, tm_data, source_lines) + total_match += m + total_mismatch += mm + total_cov_only += co + total_tm_only += to + print(f"└── match={m} diff={mm} cov-only={co} tm-only={to}") + + print(f"\n{'═' * 80}") + print( + f"TOTAL: match={total_match} diff={total_mismatch} " + f"cov-only={total_cov_only} tm-only={total_tm_only}" + ) + if total_mismatch: + print("Lines marked DIFF are where block-level reconstruction diverges from real line coverage.") + else: + print("No coverage disagreements found!") + print(f"{'═' * 80}") + + # cleanup + print(f"\nTemp dir left at: {tmpdir}") + print("Delete with: rm -rf", tmpdir) + + +if __name__ == "__main__": + main() diff --git a/testmon/cobertura.py b/testmon/cobertura.py new file mode 100644 index 0000000..53b91d1 --- /dev/null +++ b/testmon/cobertura.py @@ -0,0 +1,219 @@ +import os +import time +import xml.etree.ElementTree as ET + +from testmon.process_code import Module, INVERTED_GAP_MARKS_CHECKSUMS, get_source_sha + + +def get_file_coverage_from_checksums(rootdir, filename, covered_checksums): + """Reconstruct line coverage from block checksums (fallback). + + Returns (covered_lines, all_block_lines) as sets of line numbers. + """ + code, fsha = get_source_sha(directory=rootdir, filename=filename) + if not fsha: + return set(), set() + mtime = os.path.getmtime(os.path.join(rootdir, filename)) + module = Module( + source_code=code, + mtime=mtime, + ext=filename.rsplit(".", 1)[1] if "." in filename else "py", + fs_fsha=fsha, + filename=filename, + rootdir=rootdir, + ) + covered_lines = set() + all_block_lines = set() + + gap_checksums = set(INVERTED_GAP_MARKS_CHECKSUMS.keys()) + + for block, checksum in zip(module.blocks, module.checksums): + if checksum in gap_checksums: + continue + lines_in_block = set(range(block.start, block.end + 1)) + all_block_lines |= lines_in_block + if checksum in covered_checksums: + covered_lines |= lines_in_block + + return covered_lines, all_block_lines + + +def _get_executable_lines(rootdir, filename): + """Get the set of executable line numbers for a source file using coverage.py.""" + try: + from coverage.python import PythonParser + + filepath = os.path.join(rootdir, filename) + parser = PythonParser(filename=filepath) + parser.parse_source() + return set(parser.statements) + except Exception: + return None + + +def _infer_declaration_coverage(rootdir, filename, covered_lines, all_lines): + """Mark def/class/import declaration lines as covered when their body is covered. + + coverage.py in context mode doesn't attribute declaration lines (class Foo:, + def bar():, import x) to any test context. If the body of a function/class + has covered lines, its declaration must have been executed too. + """ + import ast + + filepath = os.path.join(rootdir, filename) + try: + with open(filepath) as f: + source = f.read() + tree = ast.parse(source) + except (OSError, SyntaxError): + return covered_lines + + result = covered_lines.copy() + + for node in ast.walk(tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + decl_line = node.lineno + if decl_line not in all_lines: + continue + body_lines = set() + for child in ast.walk(node): + if hasattr(child, "lineno") and child is not node: + body_lines.add(child.lineno) + if body_lines & covered_lines: + result.add(decl_line) + for decorator in node.decorator_list: + if hasattr(decorator, "lineno"): + result.add(decorator.lineno) + + elif isinstance(node, (ast.Import, ast.ImportFrom)): + if hasattr(node, "lineno") and node.lineno in all_lines: + result.add(node.lineno) + + return result + + +def generate_cobertura_xml(rootdir, db, exec_id, output_file): + """Generate a Cobertura XML coverage report from testmon data.""" + use_lines = db.has_line_data(exec_id) + + if use_lines: + file_lines = db.fetch_all_file_lines(exec_id) + else: + file_lines = None + + file_checksums = db.fetch_all_file_checksums(exec_id) + + all_filenames = set(file_checksums.keys()) + if file_lines: + all_filenames |= set(file_lines.keys()) + + total_lines_valid = 0 + total_lines_covered = 0 + + packages = {} + + for filename in sorted(all_filenames): + filepath = os.path.join(rootdir, filename) + if not os.path.exists(filepath): + continue + + if use_lines and filename in file_lines: + covered_lines = file_lines[filename] + executable = _get_executable_lines(rootdir, filename) + if executable is not None: + all_lines = executable + covered_lines = covered_lines & executable + covered_lines = _infer_declaration_coverage( + rootdir, filename, covered_lines, all_lines + ) + else: + all_lines = covered_lines.copy() + else: + checksums = file_checksums.get(filename, set()) + covered_lines, all_lines = get_file_coverage_from_checksums( + rootdir, filename, checksums + ) + + if not all_lines: + continue + + dirname = os.path.dirname(filename) + package_name = dirname.replace(os.sep, ".") if dirname else "." + + if package_name not in packages: + packages[package_name] = [] + + packages[package_name].append( + (filename, covered_lines, all_lines) + ) + + total_lines_valid += len(all_lines) + total_lines_covered += len(covered_lines) + + line_rate = total_lines_covered / total_lines_valid if total_lines_valid else 0 + + coverage_el = ET.Element("coverage") + coverage_el.set("version", "1.0") + coverage_el.set("timestamp", str(int(time.time() * 1000))) + coverage_el.set("lines-valid", str(total_lines_valid)) + coverage_el.set("lines-covered", str(total_lines_covered)) + coverage_el.set("line-rate", f"{line_rate:.4f}") + coverage_el.set("branches-valid", "0") + coverage_el.set("branches-covered", "0") + coverage_el.set("branch-rate", "0") + coverage_el.set("complexity", "0") + + sources_el = ET.SubElement(coverage_el, "sources") + source_el = ET.SubElement(sources_el, "source") + source_el.text = rootdir + + packages_el = ET.SubElement(coverage_el, "packages") + + for package_name, files in sorted(packages.items()): + pkg_lines_valid = 0 + pkg_lines_covered = 0 + + package_el = ET.SubElement(packages_el, "package") + package_el.set("name", package_name) + package_el.set("branch-rate", "0") + package_el.set("complexity", "0") + + classes_el = ET.SubElement(package_el, "classes") + + for filename, covered_lines, all_lines in sorted(files): + class_name = os.path.basename(filename) + file_line_rate = ( + len(covered_lines) / len(all_lines) + if all_lines + else 0 + ) + + class_el = ET.SubElement(classes_el, "class") + class_el.set("name", class_name) + class_el.set("filename", filename) + class_el.set("line-rate", f"{file_line_rate:.4f}") + class_el.set("branch-rate", "0") + class_el.set("complexity", "0") + + ET.SubElement(class_el, "methods") + + lines_el = ET.SubElement(class_el, "lines") + for line_num in sorted(all_lines): + line_el = ET.SubElement(lines_el, "line") + line_el.set("number", str(line_num)) + line_el.set("hits", "1" if line_num in covered_lines else "0") + + pkg_lines_valid += len(all_lines) + pkg_lines_covered += len(covered_lines) + + pkg_line_rate = ( + pkg_lines_covered / pkg_lines_valid if pkg_lines_valid else 0 + ) + package_el.set("line-rate", f"{pkg_line_rate:.4f}") + + tree = ET.ElementTree(coverage_el) + ET.indent(tree, space=" ") + with open(os.path.join(rootdir, output_file), "wb") as f: + tree.write(f, encoding="utf-8", xml_declaration=True) + + return os.path.join(rootdir, output_file) diff --git a/testmon/db.py b/testmon/db.py index a356607..2505ac0 100644 --- a/testmon/db.py +++ b/testmon/db.py @@ -5,12 +5,25 @@ from collections import namedtuple from functools import lru_cache +from array import array + from testmon.process_code import blob_to_checksums, checksums_to_blob + +def lines_to_blob(lines): + blob = array("I", sorted(lines)) + return sqlite3.Binary(blob.tobytes()) + + +def blob_to_lines(blob): + arr = array("I") + arr.frombytes(blob) + return set(arr.tolist()) + from testmon.common import TestExecutions -DATA_VERSION = 14 +DATA_VERSION = 15 ChangedFileData = namedtuple( "ChangedFileData", "filename name method_checksums id failed" @@ -408,6 +421,17 @@ def _create_test_execution_ffp_statement( # pylint: disable=invalid-name CREATE UNIQUE INDEX sefch_suite_id_filename_sha ON suite_execution_file_fsha(suite_execution_id, filename, fsha); """ + def _create_test_execution_lines_statement(self): + return """ + CREATE TABLE test_execution_lines ( + test_execution_id INTEGER, + filename TEXT, + lines BLOB, + FOREIGN KEY(test_execution_id) REFERENCES test_execution(id) ON DELETE CASCADE + ); + CREATE INDEX test_execution_lines_te_id ON test_execution_lines (test_execution_id); + """ + def init_tables(self): connection = self.con @@ -418,6 +442,7 @@ def init_tables(self): + self._create_temp_tables_statement() + self._create_file_fp_statement() + self._create_test_execution_ffp_statement() + + self._create_test_execution_lines_statement() ) connection.execute(f"PRAGMA user_version = {self.version_compatibility()}") @@ -591,6 +616,104 @@ def all_test_executions(self, exec_id): ) } + def fetch_all_file_checksums(self, exec_id): + """Returns {filename: set_of_checksums} unioning all fingerprints per file.""" + cursor = self.con.execute( + f""" + SELECT f.filename, f.method_checksums + FROM file_fp f + JOIN test_execution_file_fp tefp ON tefp.fingerprint_id = f.id + JOIN test_execution te ON te.id = tefp.test_execution_id + WHERE te.{self._test_execution_fk_column()} = ? + """, + (exec_id,), + ) + result = {} + for row in cursor: + filename = row["filename"] + checksums = set(blob_to_checksums(row["method_checksums"])) + if filename in result: + result[filename] |= checksums + else: + result[filename] = checksums + return result + + def insert_test_execution_lines(self, nodes_files_lines, exec_id): + """Store line-level coverage data for each test execution.""" + with self.con as con: + for test_name, files_lines in nodes_files_lines.items(): + te_row = con.execute( + f""" + SELECT id FROM test_execution + WHERE {self._test_execution_fk_column()} = ? AND test_name = ? + ORDER BY id DESC LIMIT 1 + """, + (exec_id, test_name), + ).fetchone() + if not te_row: + continue + te_id = te_row["id"] + con.execute( + "DELETE FROM test_execution_lines WHERE test_execution_id = ?", + (te_id,), + ) + rows = [] + for filename, lines in files_lines.items(): + if lines: + rows.append((te_id, filename, lines_to_blob(lines))) + if rows: + con.executemany( + "INSERT INTO test_execution_lines (test_execution_id, filename, lines) VALUES (?, ?, ?)", + rows, + ) + + def fetch_all_file_lines(self, exec_id): + """Returns {filename: set_of_line_numbers} unioning line data across all tests.""" + cursor = self.con.execute( + f""" + SELECT tel.filename, tel.lines + FROM test_execution_lines tel + JOIN test_execution te ON te.id = tel.test_execution_id + WHERE te.{self._test_execution_fk_column()} = ? + """, + (exec_id,), + ) + result = {} + for row in cursor: + filename = row["filename"] + lines = blob_to_lines(row["lines"]) + if filename in result: + result[filename] |= lines + else: + result[filename] = lines + return result + + def has_line_data(self, exec_id): + """Check if line-level coverage data exists for this execution.""" + row = self.con.execute( + f""" + SELECT 1 FROM test_execution_lines tel + JOIN test_execution te ON te.id = tel.test_execution_id + WHERE te.{self._test_execution_fk_column()} = ? + LIMIT 1 + """, + (exec_id,), + ).fetchone() + return row is not None + + def fetch_most_recent_environment_id(self, environment_name): + cursor = self.con.execute( + """ + SELECT id FROM environment + WHERE environment_name = ? + ORDER BY id DESC + LIMIT 1 + """, + (environment_name,), + ) + row = cursor.fetchone() + return row["id"] if row else None + def filenames(self, exec_id): cursor = self.con.execute( f""" diff --git a/testmon/pytest_testmon.py b/testmon/pytest_testmon.py index 434f2c9..fe22376 100644 --- a/testmon/pytest_testmon.py +++ b/testmon/pytest_testmon.py @@ -115,6 +115,20 @@ def pytest_addoption(parser): ), ) + group.addoption( + "--testmon-cov", + action="store", + type=str, + dest="testmon_cov", + default=None, + nargs="?", + const="coverage.xml", + help=( + "Generate a Cobertura XML coverage report from testmon data. " + "Optional: output filename (default: coverage.xml)." + ), + ) + parser.addini("environment_expression", "environment expression", default="") parser.addini( "testmon_ignore_dependencies", @@ -272,6 +286,24 @@ def pytest_configure(config): except TestmonException as error: pytest.exit(str(error)) + testmon_cov = config.getoption("testmon_cov", default=None) + if testmon_cov: + try: + if not hasattr(config, "testmon_data"): + environment = config.getoption( + "environment_expression" + ) or eval_environment(config.getini("environment_expression")) + testmon_data = TestmonData.for_readonly( + config.rootdir.strpath, environment=environment + ) + config.testmon_data = testmon_data + config.pluginmanager.register( + TestmonCovReport(config, config.testmon_data, testmon_cov), + "TestmonCovReport", + ) + except TestmonException as error: + pytest.exit(str(error)) + def pytest_report_header(config): tm_conf = config.testmon_config @@ -419,6 +451,11 @@ def pytest_runtest_logreport(self, report): self.testmon_data.save_test_execution_file_fps( test_executions_fingerprints ) + lines_with_imports = _merge_import_lines( + report.nodes_files_lines, + getattr(self.testmon, "_import_lines", {}), + ) + self.testmon_data.save_test_execution_lines(lines_with_imports) def pytest_keyboard_interrupt(self, excinfo): # pylint: disable=unused-argument if self._running_as == "single": @@ -428,6 +465,11 @@ def pytest_keyboard_interrupt(self, excinfo): # pylint: disable=unused-argument nodes_files_lines, self.reports ) self.testmon_data.save_test_execution_file_fps(test_executions_fingerprints) + lines_with_imports = _merge_import_lines( + nodes_files_lines, + getattr(self.testmon, "_import_lines", {}), + ) + self.testmon_data.save_test_execution_lines(lines_with_imports) self.testmon.close() def pytest_sessionfinish(self, session): # pylint: disable=unused-argument @@ -481,6 +523,28 @@ def pytest_xdist_node_collection_finished( node.config.testmon_data.sync_db_fs_tests(retain=set(ids)) +def _merge_import_lines(nodes_files_lines, import_lines): + """Merge import-time lines (the "" context) into each test's file coverage. + + coverage.py attributes lines executed at import time (class/def statements, + top-level code) to the empty "" context. Testmon strips this context for + fingerprinting, but for coverage reporting we need these lines attributed + to the tests that triggered the import. + """ + if not import_lines: + return nodes_files_lines + result = {} + for test_name, files_lines in nodes_files_lines.items(): + merged = {} + for filename, lines in files_lines.items(): + if filename in import_lines: + merged[filename] = lines | import_lines[filename] + else: + merged[filename] = lines + result[test_name] = merged + return result + + def did_fail(reports): return reports["failed"] @@ -616,6 +680,31 @@ def pytest_keyboard_interrupt(self, excinfo): # pylint: disable=unused-argument self._interrupted = True +class TestmonCovReport: + def __init__(self, config, testmon_data, output_file): + self.config = config + self.testmon_data = testmon_data + self.output_file = output_file + + @pytest.hookimpl(trylast=True) + def pytest_sessionfinish(self, session): + from testmon.cobertura import generate_cobertura_xml + + output_path = generate_cobertura_xml( + self.testmon_data.rootdir, + self.testmon_data.db, + self.testmon_data.exec_id, + self.output_file, + ) + terminal_reporter = session.config.pluginmanager.get_plugin( + "terminalreporter" + ) + if terminal_reporter: + terminal_reporter.write_line( + f"testmon-cov: wrote {output_path}" + ) + + class FakeItemFromTestmon: # pylint: disable=too-few-public-methods def __init__(self, config): self.config = config diff --git a/testmon/testmon_core.py b/testmon/testmon_core.py index 7d8b7e6..865e2b1 100644 --- a/testmon/testmon_core.py +++ b/testmon/testmon_core.py @@ -245,6 +245,19 @@ def _init_for_local_run( self.system_packages_change = result["packages_changed"] self.files_of_interest = result["filenames"] + @classmethod + def for_readonly(cls, rootdir, environment=None): + instance = cls(rootdir, readonly=True) + instance.environment = environment if environment else "default" + exec_id = instance.db.fetch_most_recent_environment_id(instance.environment) + if exec_id is None: + raise TestmonException( + f"No testmon data found for environment '{instance.environment}'. " + "Run tests with --testmon first to collect data." + ) + instance.exec_id = exec_id + return instance + @classmethod def for_worker( # pylint: disable=too-many-arguments cls, @@ -404,6 +417,9 @@ def avg_durations(self) -> dict: def save_test_execution_file_fps(self, test_executions_fingerprints): self.db.insert_test_file_fps(test_executions_fingerprints, self.exec_id) + def save_test_execution_lines(self, nodes_files_lines): + self.db.insert_test_execution_lines(nodes_files_lines, self.exec_id) + def fetch_saving_stats(self, select): return self.db.fetch_saving_stats(self.exec_id, select) @@ -464,6 +480,7 @@ def __init__( self.check_stack = [] self.is_started = False self._interrupted_at = None + self._import_lines = {} def start_cov(self): if not self.cov._started: @@ -598,7 +615,7 @@ def get_nodes_files_lines(self, dont_include): files_lines.setdefault(file, set()).add(lineno) nodes_files_lines.pop(dont_include, None) self.batched_test_names.discard(dont_include) - nodes_files_lines.pop("", None) + self._import_lines = nodes_files_lines.pop("", None) or {} for test_name in self.batched_test_names: if home_file(test_name) not in nodes_files_lines.setdefault(test_name, {}): nodes_files_lines[test_name].setdefault(home_file(test_name), {1})