Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,22 @@ The `plf.experiment` module provides powerful tools for managing your PPL databa
* **`delete_ppl(ppls)`**: Permanently deletes a pipeline from the archive.
* **`stop_running()`**: Gracefully stops a currently running pipeline after its current iteration completes.

---
### Pipeline Configuration Export Utilities

You can export pipeline (PPL) configurations from the internal database into standard JSON or YAML formats for version control or inspection.

```python
from plf.experiment import export_ppl_to_yaml, export_ppl_to_json

# Export a single pipeline config to YAML
export_ppl_to_yaml("ppl_data_run_001", output_path="./configs/ppl_data_run_001.yaml")

# Export a single pipeline config to JSON
export_ppl_to_json("ppl_data_run_001", output_path="./configs/ppl_data_run_001.json")

# Export all active pipelines at once using a wildcard
export_ppl_to_yaml("*", output_dir="./configs/")


## 📜 License

Expand Down
115 changes: 113 additions & 2 deletions src/plf/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
"delete_ppl",
"transfer_ppl",
"group_by_common_columns",
'filter_ppls'
'filter_ppls',
"export_ppl_to_json",
"export_ppl_to_yaml"
]


def get_ppls() -> List[str]:
"""
Retrieves a list of all pipeline IDs from the database.
Expand Down Expand Up @@ -481,3 +482,113 @@ def group_by_common_columns(
for k, colset in cols.items():
group_map[colset].append(k)
return group_map

def _get_ppl_data(pplid: str) -> dict:
"""Helper function to fetch and format pipeline data from the SQLite database."""
db = Db(db_path=f"{get_shared_data()['data_path']}/ppls.db")
try:
# Dynamically fetch column names to prevent indexing errors
columns_info = db.query("PRAGMA table_info(ppls)")
column_names = [col[1] for col in columns_info]

rows = db.query(f"SELECT * FROM ppls WHERE pplid = '{pplid}'")
if not rows:
db.close()
raise ValueError(f"Pipeline with ID '{pplid}' does not exist.")

row = rows[0]
db.close()

# Map columns to values
data = dict(zip(column_names, row))

# Convert internal JSON strings (like args/workflow) back into nested Python dictionaries
for key, value in data.items():
if isinstance(value, str) and (value.startswith('{') or value.startswith('[')):
try:
data[key] = json.loads(value)
except Exception:
pass
return data
except Exception as e:
if 'db' in locals():
db.close()
raise e

def _get_all_ppl_data() -> list:
"""Helper function to fetch all pipelines for wildcard exports."""
db = Db(db_path=f"{get_shared_data()['data_path']}/ppls.db")
columns_info = db.query("PRAGMA table_info(ppls)")
column_names = [col[1] for col in columns_info]

rows = db.query("SELECT * FROM ppls")
db.close()

all_data = []
for row in rows:
data = dict(zip(column_names, row))
for key, value in data.items():
if isinstance(value, str) and (value.startswith('{') or value.startswith('[')):
try:
data[key] = json.loads(value)
except Exception:
pass
all_data.append(data)
return all_data

def export_ppl_to_json(pplid: str, output_path: Optional[str] = None, output_dir: Optional[str] = None) -> str:
"""Exports pipeline configuration(s) to JSON format."""
if pplid == "*":
if not output_dir:
raise ValueError("output_dir must be provided when using wildcard '*'")
os.makedirs(output_dir, exist_ok=True)
all_ppls = _get_all_ppl_data()
for ppl in all_ppls:
p_id = ppl.get("pplid", "unknown")
path = os.path.join(output_dir, f"{p_id}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(ppl, f, indent=4)
return output_dir
else:
data = _get_ppl_data(pplid)
if not output_path:
output_path = os.path.join(output_dir, f"{pplid}.json") if output_dir else f"./{pplid}.json"

dirname = os.path.dirname(output_path)
if dirname:
os.makedirs(dirname, exist_ok=True)

with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
return output_path

def export_ppl_to_yaml(pplid: str, output_path: Optional[str] = None, output_dir: Optional[str] = None) -> str:
"""Exports pipeline configuration(s) to YAML format."""
try:
import yaml
except ImportError:
raise ImportError("The 'pyyaml' package is required for YAML export. Please install it using 'pip install pyyaml'.")

if pplid == "*":
if not output_dir:
raise ValueError("output_dir must be provided when using wildcard '*'")
os.makedirs(output_dir, exist_ok=True)
all_ppls = _get_all_ppl_data()
for ppl in all_ppls:
p_id = ppl.get("pplid", "unknown")
path = os.path.join(output_dir, f"{p_id}.yaml")
with open(path, f"w", encoding="utf-8") as f:
yaml.dump(ppl, f, default_flow_style=False)
return output_dir
else:
data = _get_ppl_data(pplid)
if not output_path:
output_path = os.path.join(output_dir, f"{pplid}.yaml") if output_dir else f"./{pplid}.yaml"

dirname = os.path.dirname(output_path)
if dirname:
os.makedirs(dirname, exist_ok=True)

with open(output_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False)
return output_path
67 changes: 67 additions & 0 deletions tests/test_export_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import json
import tempfile
import unittest
from unittest.mock import patch
import yaml
from plf.experiment import export_ppl_to_json, export_ppl_to_yaml
from plf.utils import Db

class TestPplExportUtilities(unittest.TestCase):
def setUp(self):
# Create a temporary directory for database and exports
self.test_dir = tempfile.TemporaryDirectory()
self.data_path = self.test_dir.name

# Initialize a dummy ppls.db schema for testing
self.db_path = os.path.join(self.data_path, "ppls.db")
db = Db(db_path=self.db_path)
db.execute("""
CREATE TABLE IF NOT EXISTS ppls (
pplid TEXT PRIMARY KEY,
status TEXT,
created_at TEXT,
workflow TEXT,
args TEXT
)
""")
db.execute(
"INSERT INTO ppls (pplid, status, created_at, workflow, args) VALUES (?, ?, ?, ?, ?)",
("ppl_data_run_001", "frozen", "2026-11-02T14:30:00", '{"loc": "my_workflows.GenericDataWorkflow"}', '{"param": 42}')
)
db.close()

def tearDown(self):
self.test_dir.cleanup()

@patch('plf.experiment.get_shared_data')
def test_export_to_json_success(self, mock_get_shared_data):
mock_get_shared_data.return_value = {"data_path": self.data_path}
output_path = os.path.join(self.data_path, "output.json")

export_ppl_to_json("ppl_data_run_001", output_path=output_path)

self.assertTrue(os.path.exists(output_path))
with open(output_path, "r", encoding="utf-8") as f:
data = json.load(f)
self.assertEqual(data["pplid"], "ppl_data_run_001")
self.assertEqual(data["status"], "frozen")

@patch('plf.experiment.get_shared_data')
def test_export_to_yaml_success(self, mock_get_shared_data):
mock_get_shared_data.return_value = {"data_path": self.data_path}
output_path = os.path.join(self.data_path, "output.yaml")

export_ppl_to_yaml("ppl_data_run_001", output_path=output_path)

self.assertTrue(os.path.exists(output_path))
with open(output_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
self.assertEqual(data["pplid"], "ppl_data_run_001")
self.assertEqual(data["args"]["param"], 42)

@patch('plf.experiment.get_shared_data')
def test_export_invalid_id_raises_error(self, mock_get_shared_data):
mock_get_shared_data.return_value = {"data_path": self.data_path}
with self.assertRaises(ValueError):
export_ppl_to_json("non_existent_id", output_path=os.path.join(self.data_path, "fail.json"))