Skip to content

[FEA]: tee streaming and run_step #217

Description

@rice-riley

Summary

Port the largest single concern in agent/skyhook-agent/src/skyhook_agent/controller.py — the asyncio-based subprocess streamer (_stream_process, tee, _run) and the orchestration in run_step (lines ~81–346). This is the path that actually executes a Step: it spawns the chroot-exec subcommand from #216, tees its stdout/stderr through a per-line timestamper into both the parent stdout/stderr and a per-step log file, honors the SKYHOOK_AGENT_BUFFER_LIMIT, supports SKYHOOK_AGENT_WRITE_LOGS=false to disable disk writes, and resolves env:FOO argument interpolation.

This is the biggest PR in the rewrite (~600 LOC of source + tests). It's still self-contained — every line maps 1:1 to a Python line.

Depends on #213 (types), #215 (filesystem helpers), #216 (chroot-exec subcommand).

Motivation

This module is the most observable surface of the agent: every line in kubectl logs <agent-pod> flows through it, and every *.log file under /var/log/skyhook/{package}/{version}/ is written by it. Operators have learned to recognize the [out] / [err] prefixes and the per-line ISO-8601 timestamp shape — pre-cutover dashboards, e2e log greps under k8s-tests/operator-agent/, and customer log-aggregation pipelines all depend on the format.

Behavior parity for log lines is not optional in this PR.

Feature description

A new internal/runner package providing:

  • Tee(ctx, chrootDir, cmd, stdoutPath, stderrPath, copyDir, opts) (exitCode int, err error) — the streaming subprocess wrapper.
  • RunStep(ctx, step, chrootDir, copyDir, cfg) (failed bool, err error) — the per-step orchestration.

Plus their unit tests, which lean on a fake chroot-exec subprocess (a tiny Go test helper that prints a known sequence to stdout/stderr).

Proposed direction

1. Per-line timestamper

Port _stream_process to a streamCopier(ctx, src io.Reader, sinks []io.Writer, label string, bufLimit int):

  • Read from src in chunks of bufLimit bytes (default 8192, env override SKYHOOK_AGENT_BUFFER_LIMIT).
  • Decode with unicode/utf8 validation, replacing invalid sequences with U+FFFD (Python uses errors="replace").
  • On each chunk:
    • Compute t := time.Now().Format(...) matching Python's datetime.now().isoformat() (microsecond precision, no timezone — that's what datetime.now() gives).
    • First chunk emitted prepends {label}{t} .
    • Replace every \n with \n{label}{t} so each new line gets its own timestamp.
    • Write to every sink, flushing after each write.
  • On error reading the stream, emit {label}{t} ERROR reading stream: {err}\n to all sinks and return.

2. Tee

Port tee (lines 160–201):

  • If writeLogs, open stdoutPath and stderrPath for write; otherwise use a discardWriter (Python's NullWriter).
  • If writeCmds, write strings.Join(cmd, " ") + "\n" to stdout and the stdout sink.
  • Write a temp control file under os.TempDir() containing {cmd, no_chmod, env, copy_dir}.
  • exec.CommandContext(ctx, /proc/self/exe, "chroot-exec", controlFile, chrootDir) — re-exec the agent binary's chroot-exec subcommand from [FEA]: chroot-exec subcommand on the agent binary #216. Use /proc/self/exe so we don't depend on $PATH.
  • Pipe stdout/stderr through streamCopier to [os.Stdout, stdoutSink] / [os.Stderr, stderrSink] with [out] / [err] labels respectively.
  • Use errgroup.Group to wait for both copiers + cmd.Wait().
  • Return the exit code (negative on signal — Go uses cmd.ProcessState.ExitCode() which returns -1 on signal; Python returns -signum. Match Python: detect signal via cmd.ProcessState.Sys().(syscall.WaitStatus).Signal() and return -int(signum). [FEA]: do_interrupt with NodeRestart -15 special case #218's NodeRestart -15 special case depends on this exact convention.)

3. _run synchronous wrapper

Trivial in Go (no asyncio): just Tee(...) directly. Keep an internal run function so #218 and #219 have a stable seam to call.

4. RunStep

Port run_step (lines 279–346):

  • Resolve env:FOO arguments — for each arg starting with env:, look up the env var; if missing, collect into errors and at the end print all and return failed=true.
  • time.Sleep(time.Second) (Python does this — keep it; comment as // why: legacy guard against fast-cycling step pods, see git blame for ...). Confirm the comment is accurate by checking blame; if not, drop the sleep and document in the PR description.
  • If SKYHOOK_AGENT_WRITE_LOGS, compute logFile := flags.GetLogFile(...) ([FEA]: Filesystem helpers (paths, flags, log paths) + history file #215); else logFile = "".
  • Build env: start from step.Env, then add STEP_ROOT and SKYHOOK_DIR.
  • Call Tee(...) with logFile, logFile + ".err", etc.
  • After Tee returns, if SKYHOOK_AGENT_WRITE_LOGS, call flags.CleanupOldLogs(...).
  • If returncode not in step.Returncodes, print FAILED: {path} {args} {rc} and return failed=true.
  • Else print SUCEEDED: {path} {args} (yes, that's the typo Python emits — preserve it; operator e2e logs grep for it). Add a // why: comment naming the operator e2e dependency on the typo.

5. Tests

Port the run-step / tee cases from agent/skyhook-agent/tests/test_controller.py and agent/skyhook-agent/tests/test_steps.py:

  • Sink fan-out: writes appear in both the parent stream and the file.
  • [out] / [err] labeling.
  • Per-line timestamp prefix on every newline-delimited line.
  • writeLogs=false produces no log file.
  • env:FOO resolution success and failure.
  • STEP_ROOT / SKYHOOK_DIR injected.
  • Returncode-set membership decides failure.
  • "SUCEEDED" typo present in success path.
  • CleanupOldLogs called only when writeLogs=true.

For the subprocess parts, use a small testdata/fake-chroot-exec.go that compiles to a binary and prints scripted output. Avoid relying on /proc/self/exe in unit tests — make the binary path injectable via an internal seam.

Scope boundaries

In scope:

  • Streaming, tee, run_step, env interpolation.

Out of scope:

Acceptance criteria

  • All ported test cases pass.
  • A test asserts that a known stdout sequence produces a byte-equal log file when compared to Python's output for the same input (golden test).
  • A test asserts that SKYHOOK_AGENT_BUFFER_LIMIT controls the chunk size.
  • A test asserts that the negative exit code path for SIGTERM returns -15 (not -1 or 255) so [FEA]: do_interrupt with NodeRestart -15 special case #218's NodeRestart special case works.
  • The "SUCEEDED" typo is preserved in the success path.
  • No new third-party deps beyond what [FEA]: Bootstrap agent/go/ module + port pure-data types #213 already brought in (use stdlib os/exec + golang.org/x/sync/errgroup).

Open questions

  • Timestamp precision: Python datetime.now().isoformat() emits microsecond precision (2026-05-02T21:51:23.456789). Go time.Now().Format(time.RFC3339Nano) emits nanosecond. Use a custom format string "2006-01-02T15:04:05.000000" for byte parity. Confirm this is what operator log greps expect.
  • The time.Sleep(1) in run_step: keep or drop? See proposed direction §4. Lean toward dropping if blame doesn't justify it, but call it out in the PR description either way.
  • Plumb the reconcile-style ctx through to support cancellation ([FEA]: Controller main / agent_main / SIGTERM + CLI parsing and entrypoint banner #219's SIGTERM path)? Recommend yes from day one in this PR — easier to add now than retrofit.

References (codebase)

Alternatives considered

  • Drop the per-line timestamp / [out] [err] labels and rely on kubectl logs --timestamps. Rejected — breaks the on-disk log files which operators consume directly via kubectl skyhook node logs.
  • Use a third-party log multiplexer (e.g. io.MultiWriter only). Rejected — we need per-line transformation, not just fan-out.

Code of Conduct

  • I agree to follow Skyhook's Code of Conduct.

Metadata

Metadata

Assignees

Labels

component/agentSkyhook agent (package executor)
No fields configured for Enhancement.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions