Preserve namespaces and custom requirements#2
Conversation
Add support for calrissian:DaskGatewayRequirements
There was a problem hiding this comment.
Pull request overview
This PR adds support for preserving custom namespaced CWL requirements (e.g., Calrissian/DaskGateway) by stripping them before cwl_utils validation/parsing and reinjecting them during serialization, plus a small scatter reference-cleanup fix.
Changes:
- Clean and cache custom namespaced requirements prior to validation/parsing.
- Add serialization helper to re-inject cached custom requirements and restore root
$namespaces. - Add utility to extract Dask Gateway configuration from cached custom requirements.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
src/cwl_loader/utils.py |
Adjusts scatter reference cleaning to strip both workflow and step prefixes. |
src/cwl_loader/__init__.py |
Adds caching/cleaning of custom requirements before parse, plus dump and extraction utilities. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _custom_requirements_cache = {} | ||
| _original_namespaces = {} |
There was a problem hiding this comment.
_custom_requirements_cache and original_namespaces are module-level globals and are never cleared. Loading multiple CWL documents in the same process can leak custom requirements/namespaces between loads (and is not thread-safe), which can lead to incorrect serialization when using the default cache in dump_cwl_with_custom_requirements. Consider clearing these caches at the start of each load, or (preferably) returning a per-document cache/context from load* and passing it explicitly to dump/extract helpers.
| cleaned = raw_process.copy() if isinstance(raw_process, dict) else CommentedMap(raw_process) | ||
|
|
||
| if '$namespaces' in cleaned: | ||
| _original_namespaces['__root__'] = dict(cleaned['$namespaces']) | ||
| logger.debug(f"Saved original $namespaces: {_original_namespaces['__root__']}") | ||
|
|
||
| # The custom requirements in $graph items | ||
| # We need to remove them for validation, but preserve them for Calrissian/custom runners | ||
| if '$graph' in cleaned and isinstance(cleaned['$graph'], list): | ||
| for item in cleaned['$graph']: | ||
| if isinstance(item, dict): |
There was a problem hiding this comment.
_clean_custom_namespaces only makes a shallow copy of raw_process, but then mutates nested objects (e.g., item['requirements'] inside $graph). This can inadvertently modify the caller’s raw_process in-place, which is surprising and can break any later use of the original YAML object (including round-tripping). Consider deep-copying the relevant substructures or building a new cleaned $graph list/items before modifying requirements.
| # The custom requirements in $graph items | ||
| # We need to remove them for validation, but preserve them for Calrissian/custom runners | ||
| if '$graph' in cleaned and isinstance(cleaned['$graph'], list): | ||
| for item in cleaned['$graph']: | ||
| if isinstance(item, dict): | ||
| item_id = item.get('id', 'unknown') | ||
|
|
||
| if 'requirements' in item and isinstance(item['requirements'], dict): | ||
| custom_reqs = {} | ||
| standard_reqs = {} | ||
|
|
||
| for req_name, req_value in item['requirements'].items(): | ||
| if ':' in str(req_name): | ||
| logger.debug(f"Storing custom requirement for {item_id}: {req_name}") | ||
| custom_reqs[req_name] = req_value | ||
| else: | ||
| standard_reqs[req_name] = req_value | ||
|
|
||
| if custom_reqs: | ||
| _custom_requirements_cache[item_id] = custom_reqs | ||
|
|
||
| item['requirements'] = standard_reqs | ||
|
|
||
| elif 'requirements' in item and isinstance(item['requirements'], list): | ||
| custom_reqs = [] | ||
| standard_reqs = [] | ||
|
|
||
| for req in item['requirements']: | ||
| if isinstance(req, dict): | ||
| req_class = req.get('class', '') | ||
| if ':' in str(req_class): | ||
| logger.debug(f"Storing custom requirement for {item_id}: {req_class}") | ||
| custom_reqs.append(req) | ||
| else: | ||
| standard_reqs.append(req) | ||
| else: | ||
| standard_reqs.append(req) | ||
|
|
||
| if custom_reqs: | ||
| _custom_requirements_cache[item_id] = custom_reqs | ||
|
|
||
| item['requirements'] = standard_reqs | ||
|
|
There was a problem hiding this comment.
Custom namespaced requirements are only removed from '$graph' items. For CWL documents that are not a $graph (single CommandLineTool/Workflow with a namespaced requirement), the namespaced requirement remains and can still trigger the validation/parsing failure this PR aims to avoid. Consider applying the same requirements-splitting logic to the top-level 'requirements' (and any other process objects outside $graph) as well.
| # The custom requirements in $graph items | |
| # We need to remove them for validation, but preserve them for Calrissian/custom runners | |
| if '$graph' in cleaned and isinstance(cleaned['$graph'], list): | |
| for item in cleaned['$graph']: | |
| if isinstance(item, dict): | |
| item_id = item.get('id', 'unknown') | |
| if 'requirements' in item and isinstance(item['requirements'], dict): | |
| custom_reqs = {} | |
| standard_reqs = {} | |
| for req_name, req_value in item['requirements'].items(): | |
| if ':' in str(req_name): | |
| logger.debug(f"Storing custom requirement for {item_id}: {req_name}") | |
| custom_reqs[req_name] = req_value | |
| else: | |
| standard_reqs[req_name] = req_value | |
| if custom_reqs: | |
| _custom_requirements_cache[item_id] = custom_reqs | |
| item['requirements'] = standard_reqs | |
| elif 'requirements' in item and isinstance(item['requirements'], list): | |
| custom_reqs = [] | |
| standard_reqs = [] | |
| for req in item['requirements']: | |
| if isinstance(req, dict): | |
| req_class = req.get('class', '') | |
| if ':' in str(req_class): | |
| logger.debug(f"Storing custom requirement for {item_id}: {req_class}") | |
| custom_reqs.append(req) | |
| else: | |
| standard_reqs.append(req) | |
| else: | |
| standard_reqs.append(req) | |
| if custom_reqs: | |
| _custom_requirements_cache[item_id] = custom_reqs | |
| item['requirements'] = standard_reqs | |
| def _split_requirements_for_item(item: Any, item_id: str) -> None: | |
| """ | |
| Split requirements for a single process-like item into standard and custom | |
| (namespaced) requirements. Custom requirements are cached in | |
| _custom_requirements_cache and removed from the item so that validation | |
| does not fail on unknown requirement classes. | |
| """ | |
| if not isinstance(item, dict): | |
| return | |
| if 'requirements' in item and isinstance(item['requirements'], dict): | |
| custom_reqs = {} | |
| standard_reqs = {} | |
| for req_name, req_value in item['requirements'].items(): | |
| if ':' in str(req_name): | |
| logger.debug(f"Storing custom requirement for {item_id}: {req_name}") | |
| custom_reqs[req_name] = req_value | |
| else: | |
| standard_reqs[req_name] = req_value | |
| if custom_reqs: | |
| _custom_requirements_cache[item_id] = custom_reqs | |
| item['requirements'] = standard_reqs | |
| elif 'requirements' in item and isinstance(item['requirements'], list): | |
| custom_reqs = [] | |
| standard_reqs = [] | |
| for req in item['requirements']: | |
| if isinstance(req, dict): | |
| req_class = req.get('class', '') | |
| if ':' in str(req_class): | |
| logger.debug(f"Storing custom requirement for {item_id}: {req_class}") | |
| custom_reqs.append(req) | |
| else: | |
| standard_reqs.append(req) | |
| else: | |
| standard_reqs.append(req) | |
| if custom_reqs: | |
| _custom_requirements_cache[item_id] = custom_reqs | |
| item['requirements'] = standard_reqs | |
| # The custom requirements in $graph items | |
| # We need to remove them for validation, but preserve them for Calrissian/custom runners | |
| if '$graph' in cleaned and isinstance(cleaned['$graph'], list): | |
| for item in cleaned['$graph']: | |
| if isinstance(item, dict): | |
| item_id = item.get('id', 'unknown') | |
| _split_requirements_for_item(item, item_id) | |
| # Also handle top-level requirements for documents that are not a $graph | |
| # (or for any root-level requirements in addition to $graph items) | |
| root_id = cleaned.get('id', '__root__') | |
| _split_requirements_for_item(cleaned, root_id) |
| """ | ||
| Remove $namespaces, schemas and custom requirements with namespace prefixes. | ||
| Store custom requirements in a cache for later retrieval. | ||
|
|
||
| Note: $namespaces and schemas declarations don't cause validation errors by themselves. | ||
| Only the use of custom requirements (like calrissian:DaskGatewayRequirement) causes errors | ||
| in cwl_utils parser because it doesn't recognize them. | ||
|
|
||
| Args: | ||
| raw_process: The raw CWL document as dict or CommentedMap | ||
|
|
||
| Returns: | ||
| Cleaned CWL document without custom namespaced requirements | ||
| """ | ||
| cleaned = raw_process.copy() if isinstance(raw_process, dict) else CommentedMap(raw_process) | ||
|
|
||
| if '$namespaces' in cleaned: | ||
| _original_namespaces['__root__'] = dict(cleaned['$namespaces']) | ||
| logger.debug(f"Saved original $namespaces: {_original_namespaces['__root__']}") |
There was a problem hiding this comment.
The docstring says this function removes '$namespaces' and 'schemas', but the implementation currently only saves '$namespaces' and does not remove either '$namespaces' or 'schemas' from the cleaned document. Please either update the docstring to match the behavior or implement the described removal (while still preserving/restoring as needed).
|
|
||
| # Add all properties from req_value | ||
| if isinstance(req_value, dict): | ||
| custom_req_entry.update(req_value) |
There was a problem hiding this comment.
When reinjecting dict-form custom requirements, non-dict requirement values are silently dropped (only {'class': req_name} is emitted). If a custom requirement’s value can be non-mapping (e.g., null/scalar), this will lose information on serialization. Consider preserving the original structure/value (or at least logging/warning/raising) instead of silently discarding it.
| custom_req_entry.update(req_value) | |
| custom_req_entry.update(req_value) | |
| else: | |
| # Preserve non-mapping values under a dedicated key | |
| custom_req_entry['value'] = req_value | |
| logger.warning( | |
| "Non-mapping value for custom requirement %r on process %r " | |
| "was preserved under the 'value' key during serialization.", | |
| req_name, | |
| item_id, | |
| ) |
| # Clean custom namespaces and requirements before processing | ||
| cleaned_process = _clean_custom_namespaces(raw_process) | ||
|
|
||
| updated_process = cleaned_process |
There was a problem hiding this comment.
New behavior is introduced to strip and cache custom namespaced requirements before parsing, plus new dumping/extraction helpers, but there are no tests covering these cases. Please add tests that (1) load a CWL with a namespaced requirement (both single-process and $graph forms) and confirm parsing succeeds, and (2) verify dump_cwl_with_custom_requirements round-trips the custom requirement back into the serialized YAML, and extract_dask_config returns the expected fields.
This pull request introduces support for handling custom namespaced requirements in CWL documents, particularly those used by Calrissian and DaskGateway. The changes focus on cleaning and caching custom requirements before validation, reinjecting them during serialization, and providing utilities to extract Dask configuration. These improvements help ensure compatibility with CWL parsers while preserving custom requirements for specialized runners.
ref. Terradue/pycalrissian#23
ref. https://github.com/EOEPCA/roadmap/issues/520