diff --git a/README.md b/README.md index db79a0e..e802af1 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,22 @@ The `plf.experiment` module provides powerful tools for managing your PPL databa * **`delete_ppl(ppls)`**: Permanently deletes a pipeline from the archive. * **`stop_running()`**: Gracefully stops a currently running pipeline after its current iteration completes. ---- +### Pipeline Configuration Export Utilities + +You can export pipeline (PPL) configurations from the internal database into standard JSON or YAML formats for version control or inspection. + +```python +from plf.experiment import export_ppl_to_yaml, export_ppl_to_json + +# Export a single pipeline config to YAML +export_ppl_to_yaml("ppl_data_run_001", output_path="./configs/ppl_data_run_001.yaml") + +# Export a single pipeline config to JSON +export_ppl_to_json("ppl_data_run_001", output_path="./configs/ppl_data_run_001.json") + +# Export all active pipelines at once using a wildcard +export_ppl_to_yaml("*", output_dir="./configs/") + ## 📜 License diff --git a/src/plf/experiment.py b/src/plf/experiment.py index 7ed97bb..ba4c54b 100644 --- a/src/plf/experiment.py +++ b/src/plf/experiment.py @@ -23,10 +23,11 @@ "delete_ppl", "transfer_ppl", "group_by_common_columns", - 'filter_ppls' + 'filter_ppls', + "export_ppl_to_json", + "export_ppl_to_yaml" ] - def get_ppls() -> List[str]: """ Retrieves a list of all pipeline IDs from the database. @@ -481,3 +482,113 @@ def group_by_common_columns( for k, colset in cols.items(): group_map[colset].append(k) return group_map + +def _get_ppl_data(pplid: str) -> dict: + """Helper function to fetch and format pipeline data from the SQLite database.""" + db = Db(db_path=f"{get_shared_data()['data_path']}/ppls.db") + try: + # Dynamically fetch column names to prevent indexing errors + columns_info = db.query("PRAGMA table_info(ppls)") + column_names = [col[1] for col in columns_info] + + rows = db.query(f"SELECT * FROM ppls WHERE pplid = '{pplid}'") + if not rows: + db.close() + raise ValueError(f"Pipeline with ID '{pplid}' does not exist.") + + row = rows[0] + db.close() + + # Map columns to values + data = dict(zip(column_names, row)) + + # Convert internal JSON strings (like args/workflow) back into nested Python dictionaries + for key, value in data.items(): + if isinstance(value, str) and (value.startswith('{') or value.startswith('[')): + try: + data[key] = json.loads(value) + except Exception: + pass + return data + except Exception as e: + if 'db' in locals(): + db.close() + raise e + +def _get_all_ppl_data() -> list: + """Helper function to fetch all pipelines for wildcard exports.""" + db = Db(db_path=f"{get_shared_data()['data_path']}/ppls.db") + columns_info = db.query("PRAGMA table_info(ppls)") + column_names = [col[1] for col in columns_info] + + rows = db.query("SELECT * FROM ppls") + db.close() + + all_data = [] + for row in rows: + data = dict(zip(column_names, row)) + for key, value in data.items(): + if isinstance(value, str) and (value.startswith('{') or value.startswith('[')): + try: + data[key] = json.loads(value) + except Exception: + pass + all_data.append(data) + return all_data + +def export_ppl_to_json(pplid: str, output_path: Optional[str] = None, output_dir: Optional[str] = None) -> str: + """Exports pipeline configuration(s) to JSON format.""" + if pplid == "*": + if not output_dir: + raise ValueError("output_dir must be provided when using wildcard '*'") + os.makedirs(output_dir, exist_ok=True) + all_ppls = _get_all_ppl_data() + for ppl in all_ppls: + p_id = ppl.get("pplid", "unknown") + path = os.path.join(output_dir, f"{p_id}.json") + with open(path, "w", encoding="utf-8") as f: + json.dump(ppl, f, indent=4) + return output_dir + else: + data = _get_ppl_data(pplid) + if not output_path: + output_path = os.path.join(output_dir, f"{pplid}.json") if output_dir else f"./{pplid}.json" + + dirname = os.path.dirname(output_path) + if dirname: + os.makedirs(dirname, exist_ok=True) + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=4) + return output_path + +def export_ppl_to_yaml(pplid: str, output_path: Optional[str] = None, output_dir: Optional[str] = None) -> str: + """Exports pipeline configuration(s) to YAML format.""" + try: + import yaml + except ImportError: + raise ImportError("The 'pyyaml' package is required for YAML export. Please install it using 'pip install pyyaml'.") + + if pplid == "*": + if not output_dir: + raise ValueError("output_dir must be provided when using wildcard '*'") + os.makedirs(output_dir, exist_ok=True) + all_ppls = _get_all_ppl_data() + for ppl in all_ppls: + p_id = ppl.get("pplid", "unknown") + path = os.path.join(output_dir, f"{p_id}.yaml") + with open(path, f"w", encoding="utf-8") as f: + yaml.dump(ppl, f, default_flow_style=False) + return output_dir + else: + data = _get_ppl_data(pplid) + if not output_path: + output_path = os.path.join(output_dir, f"{pplid}.yaml") if output_dir else f"./{pplid}.yaml" + + dirname = os.path.dirname(output_path) + if dirname: + os.makedirs(dirname, exist_ok=True) + + with open(output_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False) + return output_path diff --git a/tests/test_export_utilities.py b/tests/test_export_utilities.py new file mode 100644 index 0000000..309cf16 --- /dev/null +++ b/tests/test_export_utilities.py @@ -0,0 +1,67 @@ +import os +import json +import tempfile +import unittest +from unittest.mock import patch +import yaml +from plf.experiment import export_ppl_to_json, export_ppl_to_yaml +from plf.utils import Db + +class TestPplExportUtilities(unittest.TestCase): + def setUp(self): + # Create a temporary directory for database and exports + self.test_dir = tempfile.TemporaryDirectory() + self.data_path = self.test_dir.name + + # Initialize a dummy ppls.db schema for testing + self.db_path = os.path.join(self.data_path, "ppls.db") + db = Db(db_path=self.db_path) + db.execute(""" + CREATE TABLE IF NOT EXISTS ppls ( + pplid TEXT PRIMARY KEY, + status TEXT, + created_at TEXT, + workflow TEXT, + args TEXT + ) + """) + db.execute( + "INSERT INTO ppls (pplid, status, created_at, workflow, args) VALUES (?, ?, ?, ?, ?)", + ("ppl_data_run_001", "frozen", "2026-11-02T14:30:00", '{"loc": "my_workflows.GenericDataWorkflow"}', '{"param": 42}') + ) + db.close() + + def tearDown(self): + self.test_dir.cleanup() + + @patch('plf.experiment.get_shared_data') + def test_export_to_json_success(self, mock_get_shared_data): + mock_get_shared_data.return_value = {"data_path": self.data_path} + output_path = os.path.join(self.data_path, "output.json") + + export_ppl_to_json("ppl_data_run_001", output_path=output_path) + + self.assertTrue(os.path.exists(output_path)) + with open(output_path, "r", encoding="utf-8") as f: + data = json.load(f) + self.assertEqual(data["pplid"], "ppl_data_run_001") + self.assertEqual(data["status"], "frozen") + + @patch('plf.experiment.get_shared_data') + def test_export_to_yaml_success(self, mock_get_shared_data): + mock_get_shared_data.return_value = {"data_path": self.data_path} + output_path = os.path.join(self.data_path, "output.yaml") + + export_ppl_to_yaml("ppl_data_run_001", output_path=output_path) + + self.assertTrue(os.path.exists(output_path)) + with open(output_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + self.assertEqual(data["pplid"], "ppl_data_run_001") + self.assertEqual(data["args"]["param"], 42) + + @patch('plf.experiment.get_shared_data') + def test_export_invalid_id_raises_error(self, mock_get_shared_data): + mock_get_shared_data.return_value = {"data_path": self.data_path} + with self.assertRaises(ValueError): + export_ppl_to_json("non_existent_id", output_path=os.path.join(self.data_path, "fail.json")) \ No newline at end of file