Skip to content

feat(prefect): Seed example flow + auto-register Prefect deployment from workspace Gitea repo #490

@stefanko-ch

Description

@stefanko-ch

Context

PR #487 introduced the examples/workspace-seeds/kestra/flows/r2-taxi-pipeline.yaml seed pattern: a YAML flow definition is POSTed to the user's Gitea workspace fork by scripts/deploy.sh on every spin-up, and system.flow-sync (a Kestra system flow registered by deploy.sh) polls Gitea every 15 min and registers flows in Kestra under the nexus-tutorials namespace. Operators land on a fresh stack and immediately see nexus-tutorials.r2-taxi-pipeline in the Kestra UI, ready to Execute.

Prefect has the same role in our stack (workflow orchestration) and shares the same UX target: a user spins up Nexus-Stack, opens https://prefect.<domain>, and sees a tutorial deployment with one click to run. Today that's missing — the Prefect stack ships without any seeded example.

The good news: most of the infrastructure already exists.

Component Today Reference
prefect server (API + UI) stacks/prefect/docker-compose.yml
prefect-services (scheduler/events) same
prefect-worker (process pool local-pool) same
prefect-db (Postgres 16) same
~/.netrc for Gitea HTTPS auth in the worker container ✓ — wired at lines 84–86 of the compose GITEA_USERNAME / GITEA_PASSWORD env-injected
Worker auto-clones workspace repo into /flows/ on container start ✓ — lines 88–94 of the compose GITEA_REPO_URL, REPO_NAME env-injected
Cloudflare-Access-gated public UI port 4200 → prefect.<domain>
ARM64-compatible image (prefecthq/prefect:3-latest) already pinned

What's missing: the deployment-registration step. Prefect 3 has no equivalent of Kestra's system.flow-sync — deployments are explicit API objects, registered via prefect deploy (CLI) or flow.deploy() (Python). Without this step a flow file in /flows/r2_taxi_pipeline.py exists on disk but is invisible in the UI; the worker never knows it should run anything.

This issue tracks closing that gap.

Proposed solution

Three building blocks, each modelled directly on the Kestra equivalent:

1. Seed file: examples/workspace-seeds/prefect/flows/nyc_green_taxi_pipeline.py

A single Python file that mirrors the Kestra seed's shape but works on a different NYC TLC dataset so both seeded examples can coexist in the same R2 bucket without clobbering each other:

Stack NYC TLC dataset R2 path DuckDB query reads
Kestra Yellow Taxi (yellow_tripdata_2025-{01,02}.parquet) nexus-tutorials/NYC/yellow_tripdata_*.parquet yellow_tripdata_*.parquet only
Prefect Green Taxi (green_tripdata_2025-{01,02}.parquet) nexus-tutorials/NYC/green_tripdata_*.parquet green_tripdata_*.parquet only

Same CloudFront source (d37ci6vzurychx.cloudfront.net/trip-data/), same nexus-tutorials/NYC/ prefix, distinct file-name family. A student running both pipelines gets two flow runs that produce visibly different aggregates (Yellow Taxi serves Manhattan; Green Taxi covers boroughs outside Manhattan + airports — different fare distributions, different trip counts), which is itself a teaching moment ("same code shape, different data").

Idiomatic Prefect 3:

from prefect import flow, task, get_run_logger
import httpx, boto3, os, duckdb

@task(retries=2)
def download(month: str) -> bytes:
    url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-{month}.parquet"
    return httpx.get(url, follow_redirects=True, timeout=120).content

@task
def upload(month: str, body: bytes):
    s3 = boto3.client(
        "s3",
        endpoint_url=os.environ["R2_ENDPOINT"],
        aws_access_key_id=os.environ["R2_ACCESS_KEY"],
        aws_secret_access_key=os.environ["R2_SECRET_KEY"],
        region_name="auto",
    )
    s3.put_object(
        Bucket=os.environ["R2_BUCKET"],
        Key=f"nexus-tutorials/NYC/green_tripdata_2025-{month}.parquet",
        Body=body,
    )

@flow(name="nyc-green-taxi-pipeline")
def nyc_green_taxi_pipeline(months: list[str] | None = None):
    months = months or ["01", "02"]
    log = get_run_logger()
    for m in months:
        upload.submit(m, download.submit(m))
    # ... DuckDB stats query against
    #     s3://<R2_BUCKET>/nexus-tutorials/NYC/green_tripdata_*.parquet
    log.info("─── NYC Green-Taxi 2025 — Quick-Stats ─────────")

Naming follows the Kestra precedent: kebab-case flow name (nyc-green-taxi-pipeline), explicit 2-month default to keep CloudFront egress sane on student cohorts, no schedule trigger.

Schema note: Green Taxi parquets have almost the same column set as Yellow Taxi, with two notable differences — lpep_pickup_datetime / lpep_dropoff_datetime instead of tpep_*, and trip_type (street-hail vs. dispatch). Update the DuckDB stats query accordingly so the seeded flow runs cleanly out of the box.

2. Seed file: examples/workspace-seeds/prefect/prefect.yaml

A single prefect.yaml at the workspace-repo root, defining one or more deployments and a pull: step that re-clones the repo on each flow run. Idiomatic Prefect 3 — the pull: step is what gives us the live-sync behaviour Kestra has via system.flow-sync:

name: nexus-tutorials
prefect-version: 3.x

pull:
  - prefect.deployments.steps.git_clone:
      repository: "{{ $GITEA_REPO_URL }}"
      branch: main
      access_token: "{{ $GITEA_TOKEN }}"

deployments:
  - name: nyc-green-taxi-pipeline
    entrypoint: prefect/flows/nyc_green_taxi_pipeline.py:nyc_green_taxi_pipeline
    work_pool:
      name: local-pool
    tags: [nexus-stack-seed, nexus-tutorials, data-engineering]
    description: |
      Bootstrap + analyse the NYC Green-Taxi 2025 dataset on R2 (Prefect
      version of the Kestra Yellow-Taxi tutorial — both stacks ship a
      seeded flow on different TLC datasets so you can compare engines
      side-by-side without overwriting each other's R2 output).
      Edit prefect/flows/nyc_green_taxi_pipeline.py in your Gitea
      workspace repo, push, and re-run the deployment — the worker
      re-clones from Gitea on every run via the `pull:` step above.

The pull step uses Prefect Core's built-in git_clone step (no extra dependency); access_token works directly with Gitea PATs over HTTPS.

3. Auto-register the deployment in scripts/deploy.sh

After the existing Prefect-.env-generation block (lines 810–817), add a one-shot deployment-registration step modeled on the Kestra register_flow pattern:

  1. Wait for Prefect API /api/health to return 200 (auth-aware probe — Prefect server also has CF Access in front when accessed externally, but localhost:4200 direct from the runner via SSH is unprotected).
  2. Inside the running prefect-worker container, run prefect deploy --all against the cloned /flows/ directory. The worker already has ~/.netrc with Gitea creds + the repo cloned in.
  3. Capture the HTTP/CLI status; emit on success, yellow warning on failure.
  4. Idempotent on re-runs: prefect deploy upserts deployments, returning success on both first-run and re-run.

Relevant existing helpers to reuse:

  • seed_workspace_files() in scripts/deploy.sh (~line 3607) — the Python file in step 1 lands in Gitea via this loop, no change needed.
  • RUNNER_CLEANUP_PATHS for any temp files used during the registration step.
  • Argv-safe bash -s heredoc + curl --config pattern used by Kestra/SFTPGo blocks for any token-bearing call.
  • The Kestra register_flow POST→PUT-fallback shape (~line 4280) — same idea but for prefect deploy exit-code instead of HTTP status.

4. Doku updates

File Change
docs/stacks/prefect.md Add a "Seeded example deployment" section mirroring the Kestra doc — name, what it does, how to Run, how to extend months=[...]
examples/README.md Add prefect/flows/ next to kestra/flows/ in the per-stack-folders table; note that prefect/prefect.yaml is also auto-seeded at repo root
.github/copilot-instructions.md Extend §9 (no-schedule-trigger rule for seeded flows) to cover Prefect deployments — same teaching-artefact rationale

Critical files

File Change Rough size
NEW examples/workspace-seeds/prefect/flows/nyc_green_taxi_pipeline.py Python flow, Prefect-3-idiomatic, NYC Green-Taxi dataset ~80 lines
NEW examples/workspace-seeds/prefect/prefect.yaml Single deployment, git_clone pull step ~25 lines
scripts/deploy.sh Wait-for-health → exec prefect deploy --all in worker container ~40 lines
docs/stacks/prefect.md New "Seeded example deployment" section +30 lines
examples/README.md Per-stack-folder table extended with prefect/flows/ and prefect/prefect.yaml +5 lines
.github/copilot-instructions.md Extend §9 to mention Prefect +5 lines

No changes to tofu/, services.yaml, or other stacks.

Reused existing utilities

  • seed_workspace_files() — same loop seeds Python files alongside YAML; no change.
  • The prefect-worker container's existing ~/.netrc + git clone/flows/ (compose lines 84–94) — already does what we need; the registration step just needs to run after the clone.
  • Argv-safe credential transit pattern (bash -s + base64 + curl --config).
  • RUNNER_CLEANUP_PATHS for temp files holding plaintext secrets.

Out of scope (deferred for follow-up issues)

  • Live cron-style sync (Kestra parity): Prefect has no native equivalent of system.flow-sync. Truly live re-deploys would require a custom git-sync container running prefect deploy --all on a cron — deferred to a separate issue once we have feedback on whether the deploy.sh-time registration is enough for the teaching workflow.
  • Multiple seeded deployments: starts with one (r2-taxi-pipeline) to mirror Kestra. Adding more (e.g. redpanda-produce-consume.py) is mechanical: drop a new file in examples/workspace-seeds/prefect/flows/, add an entry to prefect.yaml's deployments: list.
  • Webhook-triggered redeployment (Gitea push → Prefect /deployments/POST): could replace the pull: step entirely. Bigger change to Gitea's webhook config and out of scope for the parity work.
  • Prefect Cloud: this issue is about the self-hosted server in our stack only.

Verification

End-to-end smoke test after merging:

  1. Fresh path (deepest test):

    gh workflow run destroy-all.yml --ref feat/<branch> -f confirm=DESTROY
    gh workflow run initial-setup.yaml --ref feat/<branch> -f enabled_services="prefect"

    Expected log lines:

    • ✓ Seeded prefect/flows/nyc_green_taxi_pipeline.py to Gitea (existing seed loop)
    • ✓ Seeded prefect/prefect.yaml to Gitea (existing seed loop)
    • ✓ Prefect deployment 'nyc-green-taxi-pipeline' registered (deploy --all)
  2. UI verification: open https://prefect.<domain>, navigate to Deployments. The nyc-green-taxi-pipeline deployment is visible, tagged nexus-stack-seed, with the description rendered.

  3. Run verification: hit the deployment's Run button. The worker (existing local-pool) picks up the run, git_clones the repo (visible in worker logs), executes the flow:

    • 2 parquets land in R2 at s3://<R2_BUCKET>/nexus-tutorials/NYC/green_tripdata_2025-{01,02}.parquet.
    • DuckDB stats logged in the Prefect UI run-detail view (Green-Taxi 2025 trip count, avg fare, date range — distinguishable from the Kestra Yellow-Taxi numbers because Green-Taxi serves a different NYC service area).
  4. Edit-and-rerun verification: edit prefect/flows/nyc_green_taxi_pipeline.py in Gitea (e.g. add a print statement), commit, hit Run again. The new code is picked up because the pull: step re-clones — no spin-up needed. (This is the live-sync property; without the pull: step we'd need a re-deploy.)

  5. Idempotency: run gh workflow run spin-up.yml against the already-running stack. The deployment-registration step should report ✓ already exists / updated rather than failing or duplicating.

  6. Cross-stack coexistence: with both Kestra AND Prefect enabled, both seeded deployments write into the SAME s3://<R2_BUCKET>/nexus-tutorials/NYC/ directory but on DIFFERENT file families:

    • Kestra writes yellow_tripdata_2025-*.parquet
    • Prefect writes green_tripdata_2025-*.parquet
    • Each pipeline's DuckDB stats query reads only its own file family (wildcard scoped to yellow_* or green_*).

    After running both: aws s3 ls s3://<bucket>/nexus-tutorials/NYC/ --endpoint-url <r2-endpoint> shows four files total (2 Yellow + 2 Green) — proves no overwrite, both tutorials reusable side-by-side.

  7. No-schedule-trigger rule: the Prefect deployment ships without a schedule: block in prefect.yaml. Verify in the UI that the deployment shows "No schedule" — same rule as Kestra, same rationale (a seeded schedule fires on every student stack and blows up CloudFront / R2 egress quotas).

Acceptance criteria

  • On a fresh destroy-all + initial-setup with enabled_services="prefect", the nyc-green-taxi-pipeline deployment is visible in the Prefect UI within ~30 s of deploy completion.
  • Hitting Run in the UI uploads two green_tripdata_2025-{01,02}.parquet files to R2 under nexus-tutorials/NYC/ and logs DuckDB stats in the run-detail view.
  • With Kestra also enabled, the bucket contains BOTH file families (yellow_* from Kestra, green_* from Prefect) — no overwrite.
  • Editing the flow file in Gitea + re-running picks up the change without a re-spin (pull: step works as designed).
  • No yellow warnings in the deploy log other than the existing Kestra/SFTPGo lines.
  • docs/stacks/prefect.md covers the new "Seeded example" section.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions