-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstate_sync.py
More file actions
306 lines (251 loc) · 9.21 KB
/
state_sync.py
File metadata and controls
306 lines (251 loc) · 9.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# SPDX-FileCopyrightText: 2026 KustoKing / SecM8
# SPDX-License-Identifier: Apache-2.0
"""`contentops state sync push|pull|status` — durable cross-runner state.
Closes G15. The state file at ``state/state.json`` is per-clone
today, so two CI runners produce divergent state and a fresh
checkout loses the state every time.
This module wires the orphan-branch convention DESIGN §13
promised: state lives on its own branch ``state/<env>`` whose
history is independent of main, audit-trailing every state
mutation without polluting main's commit log.
Pure git plumbing (no working-tree side effects):
* ``push`` — stages the local state/state.json onto an orphan
commit on ``refs/heads/state/<env>``, then pushes to remote.
Uses ``git hash-object`` + ``git mktree`` + ``git commit-tree``
so the working tree stays clean.
* ``pull`` — fetches ``refs/heads/state/<env>`` from remote,
reads the JSON blob via ``git show``, writes to local
state/state.json.
* ``status`` — compares the local state hash against the remote
ref's tree hash and prints divergence.
Concurrency is the operator's responsibility — pair this with a
``concurrency:`` group on the workflow that calls it so two
parallel applies queue rather than race. The CLI doesn't lock.
"""
from __future__ import annotations
import hashlib
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
class StateSyncError(RuntimeError):
"""Raised on git plumbing failures."""
def _git() -> str:
g = shutil.which("git")
if g is None:
raise StateSyncError("git is not on PATH")
return g
def _run(args: list[str], *, cwd: Path) -> str:
"""Run a git command and return stripped stdout. Raises on non-zero."""
try:
result = subprocess.run(
[_git(), "-C", str(cwd), *args],
capture_output=True, text=True, check=False, timeout=30,
)
except (OSError, subprocess.TimeoutExpired) as exc:
raise StateSyncError(f"git invocation failed: {exc}") from exc
if result.returncode != 0:
msg = (result.stderr or result.stdout or "").strip()
raise StateSyncError(
f"git {' '.join(args)} failed (exit {result.returncode}): {msg}"
)
return result.stdout.strip()
def _run_with_input(args: list[str], *, cwd: Path, stdin: str) -> str:
"""Run git with stdin. Uses BYTES so Windows doesn't translate
LF->CRLF on the way in (mktree treats the trailing \\r as part of
the filename otherwise — observed on Python 3.12 Windows)."""
try:
result = subprocess.run(
[_git(), "-C", str(cwd), *args],
input=stdin.encode("utf-8"),
capture_output=True, check=False, timeout=30,
)
except (OSError, subprocess.TimeoutExpired) as exc:
raise StateSyncError(f"git invocation failed: {exc}") from exc
if result.returncode != 0:
msg = (result.stderr or result.stdout or b"").decode(
"utf-8", errors="replace",
).strip()
raise StateSyncError(
f"git {' '.join(args)} failed (exit {result.returncode}): {msg}"
)
return result.stdout.decode("utf-8", errors="replace").strip()
def _ref_name(env: str) -> str:
"""Canonical ref for an env's state branch.
e.g. 'production' -> 'refs/heads/state/production'.
"""
safe = env.strip().lower() or "default"
return f"refs/heads/state/{safe}"
@dataclass
class PushResult:
env: str
ref: str
commit_sha: str
pushed_remote: bool
detail: str = ""
@dataclass
class PullResult:
env: str
ref: str
fetched: bool
written_path: Path | None
detail: str = ""
@dataclass
class StatusResult:
env: str
ref: str
local_present: bool
local_sha: str | None
remote_present: bool
remote_sha: str | None
in_sync: bool
# ---------------------------------------------------------------------------
# Push
# ---------------------------------------------------------------------------
def push(
env: str,
state_file: Path,
*,
repo: Path,
remote: str = "origin",
push_remote: bool = True,
actor: str = "pipeline",
) -> PushResult:
"""Push the local state file onto ``refs/heads/state/<env>``.
The orphan commit has no parent (each push replaces the ref
head). The local ref is updated even when ``push_remote=False``,
so unit tests can exercise the plumbing without a real remote.
"""
if not state_file.is_file():
raise StateSyncError(f"state file not found: {state_file}")
ref = _ref_name(env)
# 1. Stage the file as a blob in git's object store.
blob_sha = _run([
"hash-object", "-w", str(state_file),
], cwd=repo)
# 2. Build a tree containing just state.json -> blob.
tree_input = f"100644 blob {blob_sha}\tstate.json\n"
tree_sha = _run_with_input(
["mktree"], cwd=repo, stdin=tree_input,
)
# 3. Commit the tree with no parent (orphan).
msg = f"[state] {env} {state_file.read_text(encoding='utf-8')[:200]}"
# Keep the commit message tight — first 200 chars of the JSON
# blob is enough to grep the ref's history later.
commit_sha = _run_with_input(
["commit-tree", tree_sha, "-m", f"[state] {env} sync from {actor}"],
cwd=repo, stdin="",
)
# 4. Update the local ref to point at the new commit.
_run(["update-ref", ref, commit_sha], cwd=repo)
pushed = False
detail = ""
if push_remote:
try:
_run(["push", "--force", remote, f"{ref}:{ref}"], cwd=repo)
pushed = True
except StateSyncError as exc:
detail = f"local ref updated; remote push failed: {exc}"
# Local update succeeded, so don't re-raise — let caller
# decide whether to fail.
return PushResult(
env=env, ref=ref, commit_sha=commit_sha,
pushed_remote=pushed, detail=detail,
)
# ---------------------------------------------------------------------------
# Pull
# ---------------------------------------------------------------------------
def pull(
env: str,
state_file: Path,
*,
repo: Path,
remote: str = "origin",
fetch_remote: bool = True,
) -> PullResult:
"""Pull the remote state ref into ``state_file``.
Tolerates missing ref (returns ``fetched=False``) so a first-
run workflow can call ``pull`` unconditionally.
"""
ref = _ref_name(env)
fetched = False
if fetch_remote:
try:
_run(["fetch", remote, f"+{ref}:{ref}"], cwd=repo)
fetched = True
except StateSyncError:
# Remote doesn't have the ref yet; treat as empty state.
pass
# Check if the local ref exists.
try:
_run(["rev-parse", "--verify", ref], cwd=repo)
except StateSyncError:
return PullResult(
env=env, ref=ref, fetched=fetched, written_path=None,
detail=f"ref {ref} does not exist locally — empty state",
)
# Read the state.json blob from the ref.
try:
body = _run([
"show", f"{ref}:state.json",
], cwd=repo)
except StateSyncError as exc:
return PullResult(
env=env, ref=ref, fetched=fetched, written_path=None,
detail=f"ref {ref} has no state.json: {exc}",
)
state_file.parent.mkdir(parents=True, exist_ok=True)
state_file.write_text(body + ("\n" if not body.endswith("\n") else ""), encoding="utf-8")
return PullResult(
env=env, ref=ref, fetched=fetched, written_path=state_file,
detail=f"wrote {state_file}",
)
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
def _file_blob_sha(path: Path) -> str:
"""Compute the SHA git would assign to a blob containing this file.
Uses SHA-1 because that is the hash function git itself uses for
object identity; the value of this function is to match git's
own blob hash, not to provide cryptographic guarantees. The
``usedforsecurity=False`` flag tells bandit (B324) and similar
security scanners that this is a content-addressable hash, not
a primitive in a security boundary.
"""
body = path.read_bytes()
header = f"blob {len(body)}\0".encode("utf-8")
return hashlib.sha1(header + body, usedforsecurity=False).hexdigest()
def status(
env: str,
state_file: Path,
*,
repo: Path,
) -> StatusResult:
"""Compare local state file against remote ref."""
ref = _ref_name(env)
local_present = state_file.is_file()
local_sha = _file_blob_sha(state_file) if local_present else None
remote_sha: str | None = None
remote_present = False
try:
remote_sha = _run([
"rev-parse", f"{ref}:state.json",
], cwd=repo)
remote_present = True
except StateSyncError:
pass
in_sync = (
local_present and remote_present
and local_sha == remote_sha
)
return StatusResult(
env=env, ref=ref,
local_present=local_present, local_sha=local_sha,
remote_present=remote_present, remote_sha=remote_sha,
in_sync=in_sync,
)
__all__ = [
"StateSyncError",
"PushResult", "PullResult", "StatusResult",
"push", "pull", "status",
]