Skip to content

Remote Spark test orchestration framework #207

@sdebruyn

Description

@sdebruyn

Summary

Add a testing orchestration framework that enables running FabricSpark integration tests as Spark jobs directly on Microsoft Fabric infrastructure, instead of locally. Two opt-in modes, default local execution unchanged. Integrates natively with pytest via a --remote flag — no separate CLI script needed.

Context

FabricSpark integration tests currently run on the developer's machine, connecting to Fabric Lakehouses via Livy sessions over HTTP. This works but requires local Python setup, credentials, network access, and means test artifacts (compiled SQL, logs) live in ephemeral /tmp directories inaccessible from the Fabric portal.

This plan adds two opt-in execution modes that let tests run closer to the data — either as a Spark job inside Fabric itself, or with artifacts written to a locally-mounted lakehouse. The existing default (local execution via Livy) is unchanged.

Key design choice: Remote execution is triggered via a native --remote pytest flag. The pytest_runtestloop hook intercepts execution after collection and delegates to Fabric. The developer just runs pytest --de --remote -k "TestFoo" and gets normal pytest output.

Developer Experience

$ uv run pytest --de --remote -k "TestBasicFabricSpark" -v

======================== test session starts ========================
collected 5 items

Remote execution: syncing project to lakehouse...
  Sync complete (12 files changed, 3.2s)

Remote execution: submitting Spark job...
  Job URL: https://app.fabric.microsoft.com/groups/.../sparkJobDefinitions/.../runs/...
  Remote args: --de -k "TestBasicFabricSpark" -v --junitxml /lakehouse/.../results.xml

Waiting for Spark job...
  [0:30] Running...
  [1:00] Running...
  [2:15] Completed (2m15s)

Downloading results...

tests/fabricspark/adapter/test_basic.py::TestBasicFabricSpark::test_base PASSED
tests/fabricspark/adapter/test_basic.py::TestBasicFabricSpark::test_schema_test PASSED
tests/fabricspark/adapter/test_basic.py::TestBasicFabricSpark::test_empty FAILED

======================== FAILURES ========================
tests/fabricspark/adapter/test_basic.py::TestBasicFabricSpark::test_empty
  (remote) AssertionError: Expected 0 rows, got 1

  Captured stdout (remote):
    Running dbt with args: ['run', '--select', 'model_empty']

======================== 4 passed, 1 failed in 2m18s (remote) ========================

Architecture

┌─────────────────────────────────────────────────────────────────┐
│  Developer machine                                              │
│                                                                 │
│  pytest --de --remote -k "TestFoo"                              │
│    │                                                            │
│    ├─ Normal collection (discovers test items)                  │
│    ├─ pytest_runtestloop hook intercepts                        │
│    │   ├─ sync.py ──── azcopy sync ────────┼───► OneLake       │
│    │   ├─ spark_job_client.py ─────────────┼───► Fabric API    │
│    │   ├─ poll until done                                       │
│    │   ├─ download junitxml + artifacts                         │
│    │   └─ result_reporter.py ── fire TestReport hooks           │
│    └─ pytest renders results normally                           │
│                                                                 │
│  Mode B (mounted): conftest detects ONELAKE_PATH,               │
│  writes project_root/profiles_root/logs there directly          │
└─────────────────────────────────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────────┐
│  Fabric Spark (Mode A only)                                     │
│                                                                 │
│  Spark Job Definition executes spark_entry_point.py             │
│    ├─ pip install from requirements-remote.txt (exact pins)     │
│    ├─ pip install -e project --no-deps                          │
│    ├─ Sets FABRIC_TEST_SPARK_EXEC_MODE=remote                   │
│    ├─ Runs pytest with forwarded args + --junitxml              │
│    └─ Writes results.xml to /lakehouse/.../dbt-test-artifacts/  │
│                                                                 │
│  conftest detects remote mode → project_root points to          │
│  /lakehouse/default/Files/dbt-test-artifacts/<prefix>/          │
└─────────────────────────────────────────────────────────────────┘

Components

1. tests/conftest.py — hook registration + artifact path redirect

New option in pytest_addoption:

parser.addoption(
    "--remote",
    action="store_true",
    default=False,
    help="Run FabricSpark tests as a remote Spark job on Fabric infrastructure",
)

New pytest_runtestloop hook (delegates to conftest_plugin module):

def pytest_runtestloop(session):
    if not session.config.getoption("--remote", default=False):
        return None  # Fall through to default
    from tests.spark_remote.conftest_plugin import remote_runtestloop
    return remote_runtestloop(session)

Since pytest_runtestloop is @hookspec(firstresult=True), returning a non-None value (True) prevents the default loop from running.

Validation: If --remote is set without --de, raise an error: "--remote requires --de (FabricSpark tests only)".

Artifact path redirect (needed on the REMOTE side when tests execute inside Spark):

Add import py to imports.

def _on_fabric_project_base() -> Path | None:
    mode = os.getenv("FABRIC_TEST_SPARK_EXEC_MODE", "").lower()
    if mode == "remote":
        return Path("/lakehouse/default/Files/dbt-test-artifacts")
    elif mode == "mounted":
        path = os.getenv("FABRIC_TEST_ONELAKE_PATH")
        if not path:
            raise ValueError("FABRIC_TEST_ONELAKE_PATH required when SPARK_EXEC_MODE=mounted")
        return Path(path) / "dbt-test-artifacts"
    return None

Override fixtures (must return py.path.local for upstream compatibility with tmpdir_factory.mktemp() return type):

@pytest.fixture(scope="class")
def project_root(tmpdir_factory, prefix):
    base = _on_fabric_project_base()
    if base is None:
        project_root = tmpdir_factory.mktemp("project")
        print(f"\n=== Test project_root: {project_root}")
        return project_root
    path = base / prefix / "project"
    path.mkdir(parents=True, exist_ok=True)
    print(f"\n=== Test project_root: {path}")
    return py.path.local(path)

@pytest.fixture(scope="class")
def profiles_root(tmpdir_factory, prefix):
    base = _on_fabric_project_base()
    if base is None:
        return tmpdir_factory.mktemp("profile")
    path = base / prefix / "profile"
    path.mkdir(parents=True, exist_ok=True)
    return py.path.local(path)

Replace existing logs_dir fixture (currently at line 231):

@pytest.fixture(scope="class")
def logs_dir(request, prefix):
    base = _on_fabric_project_base()
    if base is not None:
        dbt_log_dir = str(base / prefix / "logs")
    else:
        dbt_log_dir = os.path.join(request.config.rootdir, "logs", prefix)
    os.makedirs(dbt_log_dir, exist_ok=True)
    print(f"\n=== Test logs_dir: {dbt_log_dir}\n")
    os.environ["DBT_LOG_PATH"] = dbt_log_dir
    yield dbt_log_dir
    del os.environ["DBT_LOG_PATH"]

Key differences from current logs_dir:

  • Adds os.makedirs(..., exist_ok=True) — needed because OneLake paths don't pre-exist
  • Routes to lakehouse path when mode is set
  • Keeps the DBT_LOG_PATH env var pattern unchanged

2. tests/spark_remote/conftest_plugin.py — the hook implementation

Core orchestration logic called from pytest_runtestloop:

def remote_runtestloop(session: pytest.Session) -> bool:
    # 1. Handle --collect-only
    if session.config.option.collectonly:
        return True

    # 2. If no items collected, nothing to do
    if not session.items:
        return True

    # 3. Build remote pytest args
    remote_args = _build_remote_args(session)

    # 4. Sync project to lakehouse
    from tests.spark_remote.orchestrator import RemoteTestOrchestrator
    orchestrator = RemoteTestOrchestrator.from_env()
    orchestrator.sync_project()

    # 5. Submit and wait for Spark job
    job_result = orchestrator.run_spark_job(remote_args)

    # 6. Download artifacts and report results
    from tests.spark_remote.result_reporter import report_remote_results
    report_remote_results(session, orchestrator, job_result)

    return True

_build_remote_args(session) logic:

  • Preserves: -k, -v, --de, --with-grants, --with-python
  • Strips: --remote (to avoid recursion)
  • Adds: --junitxml=/lakehouse/default/Files/dbt-test-artifacts/results.xml

3. tests/spark_remote/result_reporter.py — junitxml to TestReport mapping

Parses downloaded results.xml and fires pytest hooks for each test result.

Nodeid matching: Remote pytest runs with the same test root directory layout (synced via azcopy), so nodeids are identical.

TestReport construction: Each test needs THREE reports (setup/call/teardown), following pytest's protocol:

def report_remote_results(session, orchestrator, job_result):
    results_path = orchestrator.download_results()

    if results_path is None:
        _report_all_as_error(session, f"Remote job {job_result.status}: {job_result.error_message}")
        return

    results = _parse_junitxml(results_path)
    result_map = {r.nodeid: r for r in results}

    for item in session.items:
        ihook = item.ihook
        ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)

        result = result_map.get(item.nodeid)
        if result is None:
            _report_item_as_error(item, "Test not found in remote execution results")
        else:
            _report_item_result(item, result)

        ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)

Sections from junitxml <system-out>/<system-err>:

sections = []
if stdout_text:
    sections.append(("Captured stdout (remote)", stdout_text))
if stderr_text:
    sections.append(("Captured stderr (remote)", stderr_text))

Nodeid reconstruction from junitxml:

  • junitxml has file attribute (xunit2 format) + classname + name
  • Reconstruct: file_attr::ClassName::test_name
  • Heuristic for xunit1 (no file attr): class names start with uppercase, split classname on that boundary

4. tests/spark_remote/spark_job_client.py — Fabric REST API wrapper

Wraps the Spark Job Definition APIs. Uses azure.identity.AzureCliCredential for tokens (https://analysis.windows.net/powerbi/api/.default — same scope as FabricTokenProvider.FABRIC_CREDENTIAL_SCOPE).

Key methods:

  • __init__(workspace_id, token_provider_fn) — takes workspace ID and a callable returning bearer tokens
  • list_spark_job_definitions() — paginated GET /v1/workspaces/{wid}/sparkJobDefinitions
  • find_by_name(name) -> dict | None — find existing definition
  • create_spark_job_definition(name, lakehouse_id, executable_path) -> str — creates definition with base64-encoded SparkJobDefinitionV1 payload, returns item ID
  • run_on_demand(item_id, command_line_args: list[str]) -> tuple[str, str] — POST to .../jobs/instances?jobType=sparkjob with executionData, returns (item_id, job_instance_id) parsed from Location header
  • get_job_instance(item_id, job_instance_id) -> dict — poll job status
  • poll_until_done(item_id, job_instance_id, interval=10, timeout=3600) -> SparkJobResult — poll loop returning final status

The SparkJobDefinitionV1 payload format (base64-encoded JSON in definition.parts):

{
  "executableFile": "abfss://...",
  "defaultLakehouseArtifactId": "<lakehouse-id>",
  "language": "Python",
  "environmentArtifactId": null
}

SparkJobResult dataclass:

@dataclass
class SparkJobResult:
    status: str  # "Completed", "Failed", "Cancelled"
    start_time: str | None
    end_time: str | None
    job_url: str  # Fabric portal URL for developer inspection
    error_message: str | None

Job portal URL construction:
https://app.fabric.microsoft.com/groups/{workspace_id}/sparkJobDefinitions/{item_id}/runs/{instance_id}

Singleton SJD vs ephemeral: Use a single, named SJD (from FABRIC_TEST_SPARK_JOB_NAME or "dbt-fabric-tests") that gets created once and reused. The commandLineArguments change per invocation via the job run API.

5. tests/spark_remote/sync.py — azcopy sync wrapper

Wraps azcopy sync via subprocess.run.

Upload (project → lakehouse):

  • Target: https://onelake.dfs.fabric.microsoft.com/{workspace_name}/{lakehouse_name}.Lakehouse/Files/dbt-fabric-tests
  • Auth: --login-type=azcli
  • Excludes: .venv;.git;__pycache__;.cache;.claude;logs;.ruff_cache;.pytest_cache;.eggs;dist;build;site;.vscode;node_modules;remote-test-results;dbt-test-artifacts
  • Pattern excludes: *.pyc;*.pyo;*.egg-info
  • --delete-destination=true

Download (artifacts ← lakehouse):

  • Source: same base + /dbt-test-artifacts
  • Target: local remote-test-results/
  • --delete-destination=true

Generates requirements-remote.txt by running uv export --format requirements.txt --all-extras --group dev and writing the output to the project directory before sync. This pins every dependency to the exact version from uv.lock, ensuring the Spark job installs identical packages to the local environment.

Also generates test.env.remote from local test.env with overrides: FABRIC_TEST_AUTH=notebookutils, FABRIC_TEST_SPARK_EXEC_MODE=remote.

6. tests/spark_remote/spark_entry_point.py — runs inside Spark job

This is a static Python script that Fabric's Spark runtime executes. It uses a pre-exported requirements.txt (generated by the sync step) to install the exact same dependency versions as the local uv.lock.

Steps:

  1. pip install -r /lakehouse/default/Files/dbt-fabric-tests/requirements-remote.txt — exact pinned deps from uv lockfile
  2. pip install -e /lakehouse/default/Files/dbt-fabric-tests --no-deps — install the project itself without resolving deps (they're already installed)
  3. Loads env from /lakehouse/default/Files/dbt-fabric-tests/test.env.remote via dotenv
  4. Sets FABRIC_TEST_SPARK_EXEC_MODE=remote
  5. Runs pytest with sys.argv[1:] as args (forwarded via commandLineArguments)
  6. Exit code preserved (but junitxml is the primary result channel back to local pytest)

Auth inside Spark: notebookutils (already supported by FabricTokenProvider). Uses starter pools (no Fabric Environment needed, so fast startup).

7. tests/spark_remote/orchestrator.py — coordination layer

Ties together sync, job submission, and artifact retrieval:

class RemoteTestOrchestrator:
    @classmethod
    def from_env(cls) -> "RemoteTestOrchestrator":
        """Construct from environment variables (same ones as test.env)."""
        ...

    def sync_project(self) -> None:
        """Sync local project to lakehouse. Generates requirements-remote.txt and test.env.remote."""
        ...

    def run_spark_job(self, pytest_args: list[str]) -> SparkJobResult:
        """Submit job, print portal URL, poll until completion."""
        ...

    def download_results(self) -> Path | None:
        """Download junitxml + artifacts from lakehouse. Returns local path to results.xml or None."""
        ...

Developer UX during execution:

  • Print: Remote execution: syncing project to lakehouse...
  • Print: Job URL: <portal_url>
  • Print periodic status: [1:30] Running...
  • On completion: Completed (2m15s). Downloading results...

8. Peripheral file changes

test.env.sample — add new commented vars:

# Remote Spark test execution (optional)
#FABRIC_TEST_SPARK_EXEC_MODE=        # "remote" or "mounted"; empty = local (default)
#FABRIC_TEST_ONELAKE_PATH=           # required for "mounted" mode (local mount path)
#FABRIC_TEST_SPARK_JOB_NAME=dbt-fabric-tests  # Spark Job Definition display name

.gitignore — add:

dbt-test-artifacts/
remote-test-results/
test.env.remote
requirements-remote.txt

New file tree

tests/spark_remote/
├── __init__.py
├── conftest_plugin.py        # pytest_runtestloop implementation
├── result_reporter.py        # junitxml → TestReport mapping
├── orchestrator.py           # ties sync + job client together
├── spark_job_client.py       # Fabric REST API for Spark Job Definitions
├── sync.py                   # azcopy wrapper
└── spark_entry_point.py      # executed inside Spark job

Mode B: mounted lakehouse workflow

No orchestrator needed. The conftest fixture overrides handle everything:

# Mount lakehouse (OneLake file explorer on Windows, or mount command)
git worktree add /path/to/mounted/lakehouse/Files/dbt-fabric-tests -b test-session
cd /path/to/mounted/lakehouse/Files/dbt-fabric-tests
cp /path/to/dbt-fabric/test.env .

# Add to test.env:
# FABRIC_TEST_SPARK_EXEC_MODE=mounted
# FABRIC_TEST_ONELAKE_PATH=/path/to/mounted/lakehouse/Files

uv sync --all-extras --all-groups
uv run pytest --de -k "TestBasicSpark"
# Logs and project files are on OneLake, visible in Fabric portal

Implementation order

  1. tests/spark_remote/__init__.py + spark_job_client.py — API wrapper (can be tested independently)
  2. tests/spark_remote/sync.py — azcopy wrapper
  3. tests/spark_remote/spark_entry_point.py — static entry point script
  4. tests/spark_remote/result_reporter.py — junitxml parsing + TestReport construction
  5. tests/spark_remote/conftest_plugin.py — the hook implementation
  6. tests/spark_remote/orchestrator.py — ties everything together
  7. tests/conftest.py — add --remote option, pytest_runtestloop hook, _on_fabric_project_base(), fixture overrides
  8. test.env.sample + .gitignore updates

Error handling

Scenario Behavior
--remote without --de pytest.exit with error message
azcopy not on PATH Print install instructions URL, pytest.exit
az account show fails Print az login instructions, pytest.exit
Spark Job Definition exists Reuse it (find by name)
Job fails before pytest runs No junitxml → report all items as errors with Spark error message
Job completes, junitxml missing Report all items as errors: "entry point failed before pytest ran"
Job completes, junitxml partial Matched items reported normally, unmatched as errors
Job timeout (default 30min) Report all items as errors with timeout message + portal URL
API 429 / transient errors Retry with exponential backoff (3 attempts per poll)
Network error during poll Retry 3 times, then raise

Verification

  1. conftest isolation (Mode B dry-run): Set FABRIC_TEST_SPARK_EXEC_MODE=mounted and FABRIC_TEST_ONELAKE_PATH=/tmp/test-onelake, run a single test with --de. Verify paths under /tmp/test-onelake/dbt-test-artifacts/<prefix>/.
  2. Result reporter unit test: Feed a sample junitxml, verify correct TestReport objects are constructed and hooks fired.
  3. azcopy sync: Manual test with real lakehouse — verify excludes, correct remote path structure.
  4. End-to-end Mode A: uv run pytest --de --remote -k "TestBasicSpark" — verify sync, job creation, polling, result display.
  5. Mode B with real mount: Run tests with mounted lakehouse, verify artifacts visible in Fabric portal.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestfabricsparkRelated to the FabricSpark (Spark SQL / Lakehouse) adapter

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions