Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/benchmark/scenario_suite_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ def scenario_dir_for_id(scenario_root: Path, scenario_id: str) -> Path:
"""Return the expected scenario folder path for a scenario id."""
return scenario_root / f"scenario_{scenario_id}"

def export_couchdb_state(scenario_id: str, scenario_root: Path,
out_dir: Path, dry_run: bool) -> None:
env = os.environ.copy()
env["SCENARIOS_DATA_DIR"] = str(scenario_root)
dest = out_dir / f"couchdb_state_{scenario_id}.json"
cmd = [sys.executable, "src/couchdb/init_data.py", "--export", str(dest)]
print(" ".join(cmd))
if dry_run:
return
subprocess.run(cmd, check=True, cwd=str(REPO_ROOT), env=env)

def read_question(scenario_root: Path, scenario_id: str) -> str:
"""Read question.txt for a scenario."""
Expand Down Expand Up @@ -360,6 +370,19 @@ def main() -> None:
trajectory_dir=trajectory_dir,
dry_run=args.dry_run,
)
# capture post-run state right after the agent finishes
try:
export_couchdb_state(
scenario_id=scenario_id,
scenario_root=args.scenario_root,
out_dir=trajectory_dir, # or a dedicated states_root
dry_run=args.dry_run,
)
except Exception as exc:
print(
f"warning: state export failed for scenario {scenario_id}: {exc}",
file=sys.stderr,
)
except Exception as exc:
print(
f"error: scenario {scenario_id} failed for method {method.agent_name}: {exc}",
Expand Down
34 changes: 34 additions & 0 deletions src/couchdb/init_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,31 @@ def init_data(
)
return results

def export_state(
dest=None,
managed_only: bool = True,
include_design: bool = False,
) -> dict:
"""Snapshot current CouchDB state → {db: [docs]}.

managed_only=True limits to the default-manifest collections (the ones a scenario
touches); False dumps every user database. If `dest` is given, the snapshot is also
written there: a directory gets one <db>.json per database, any other path gets a
single combined JSON file.
"""
targets = all_databases() if managed_only else loader.list_databases()
state = {db: loader.export_database(db, include_design=include_design)
for db in targets}

if dest is not None:
if os.path.isdir(dest):
for db, docs in state.items():
with open(os.path.join(dest, f"{db}.json"), "w") as f:
json.dump(docs, f, indent=2, sort_keys=True)
else:
with open(dest, "w") as f:
json.dump(state, f, indent=2, sort_keys=True)
return state

def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
Expand Down Expand Up @@ -181,13 +206,22 @@ def main() -> None:
action="store_true",
help="With --reset/--reset-only: drop only the default-manifest collections.",
)
p.add_argument("--export", metavar="DEST",
help="Snapshot current CouchDB state to DEST (dir or .json file) and exit.")

a = p.parse_args()

if a.reset_only:
for db in reset(managed_only=a.managed_only):
print(f"dropped\t{db}")
return

if a.export:
state = export_state(dest=a.export, managed_only=a.managed_only)
for db, docs in state.items():
print(f"exported\t{db}\t{len(docs)}")
return

for key, (db, n) in init_data(
a.scenario, force=not a.reuse, reset_first=a.reset, managed_only=a.managed_only
).items():
Expand Down
25 changes: 25 additions & 0 deletions src/couchdb/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,31 @@ def drop_database(db) -> int:
r.raise_for_status()
return r.status_code

def export_database(db, include_design=False) -> list:
"""Return all documents in a database via _all_docs?include_docs=true.

_rev is stripped so snapshots are stable/diffable across runs. Design docs
(_design/*) are skipped unless include_design=True.
"""
r = requests.get(
_db_url(db, "_all_docs"),
params={"include_docs": "true"},
auth=_AUTH,
timeout=60,
)
if r.status_code == 404:
return [] # DB not created for this scenario -> empty snapshot, not a crash
r.raise_for_status()
docs = []
for row in r.json().get("rows", []):
doc = row.get("doc")
if doc is None:
continue
if doc["_id"].startswith("_design/") and not include_design:
continue
doc.pop("_rev", None) # volatile; drop for deterministic comparison
docs.append(doc)
return docs

def _ensure_db(db, drop):
if requests.head(_db_url(db), auth=_AUTH, timeout=10).status_code == 200:
Expand Down
2 changes: 1 addition & 1 deletion src/couchdb/scenarios_data/default/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
"shared/iot/hydraulic_pump_1.json"
],
"vibration": "shared/iot/motor_01.json",
"failurecode": "shared/failure_code/failure_code_sample.csv"
"failure_code": "shared/failure_code/failure_code_sample.csv"
}
Empty file added src/couchdb/tests/__init__.py
Empty file.
52 changes: 52 additions & 0 deletions src/couchdb/tests/test_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Export tests — no live CouchDB; requests is mocked."""
import json, os, sys, tempfile, types
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))

import couchdb.loader as loader
import couchdb.init_data as init_data


class _Resp:
def __init__(self, payload, code=200):
self._p, self.status_code = payload, code
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def json(self):
return self._p


def _patch(get):
loader.requests = types.SimpleNamespace(get=get)
init_data.loader = loader


def test_strips_rev_and_design_docs():
_patch(lambda url, **k: _Resp({"rows": [
{"doc": {"_id": "_design/x", "views": {}}},
{"doc": {"_id": "wo:1", "_rev": "1-a", "status": "WAPPR"}},
]}))
docs = loader.export_database("workorder")
assert docs == [{"_id": "wo:1", "status": "WAPPR"}]


def test_include_design_keeps_design_doc():
_patch(lambda url, **k: _Resp({"rows": [{"doc": {"_id": "_design/x"}}]}))
assert loader.export_database("workorder", include_design=True)


def test_missing_db_returns_empty_not_error():
_patch(lambda url, **k: _Resp({}, 404))
assert loader.export_database("ghost") == []


def test_export_state_excludes_system_dbs_and_writes_file():
def get(url, **k):
if url.endswith("/_all_dbs"):
return _Resp(["workorder", "_users"])
return _Resp({"rows": [{"doc": {"_id": "a:1", "_rev": "1-z", "v": 1}}]})
_patch(get)
fp = os.path.join(tempfile.mkdtemp(), "state.json")
state = init_data.export_state(dest=fp, managed_only=False)
assert "_users" not in state and list(state) == ["workorder"]
assert json.load(open(fp)) == state