diff --git a/streamflow/cwl/transformer.py b/streamflow/cwl/transformer.py index 9cb5e0986..cc5868cca 100644 --- a/streamflow/cwl/transformer.py +++ b/streamflow/cwl/transformer.py @@ -19,7 +19,11 @@ from streamflow.cwl.step import build_token from streamflow.cwl.workflow import CWLWorkflow from streamflow.workflow.token import ListToken, TerminationToken -from streamflow.workflow.transformer import ManyToOneTransformer, OneToOneTransformer +from streamflow.workflow.transformer import ( + ManyToOneTransformer, + OneToManyTransformer, + OneToOneTransformer, +) from streamflow.workflow.utils import get_token_value @@ -38,6 +42,14 @@ async def transform( return {self.get_output_name(): self._transform(*next(iter(inputs.items())))} +class BroadcastTransformer(OneToManyTransformer): + async def transform( + self, inputs: MutableMapping[str, Token] + ) -> MutableMapping[str, Token | MutableSequence[Token]]: + token = list(inputs.values()).pop() + return {name: token.update(token.value) for name in self.output_ports.keys()} + + class CartesianProductSizeTransformer(ManyToOneTransformer): async def transform( self, inputs: MutableMapping[str, Token] diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 1b252cb70..7de4fc3fc 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -75,6 +75,7 @@ ) from streamflow.cwl.transformer import ( AllNonNullTransformer, + BroadcastTransformer, CartesianProductSizeTransformer, CloneTransformer, CWLTokenTransformer, @@ -1833,10 +1834,36 @@ def _inject_inputs(self, workflow: Workflow) -> None: value=self.cwl_inputs[port_name], ) # Search empty unbound input ports + empty_ports = [] for input_port in workflow.ports.values(): if input_port.empty() and not input_port.get_input_steps(): - input_port.put(Token(value=None, recoverable=True)) - input_port.put(TerminationToken()) + empty_ports.append(input_port) + if len(empty_ports) == 1: + self._inject_input( + workflow=workflow, + global_name="/__empty_unbound_inputs__", + port_name="__empty_unbound_inputs__", + port=empty_ports[0], + output_directory=output_directory, + value=None, + ) + elif len(empty_ports) > 1: + step = workflow.create_step( + cls=BroadcastTransformer, + name="/__empty_unbound_inputs__-bcast", + ) + upstream_port = workflow.create_port() + step.add_input_port("__upstream__", upstream_port) + for i, downstream_port in enumerate(empty_ports): + step.add_output_port(f"__downstream_{i}__", downstream_port) + self._inject_input( + workflow=workflow, + global_name="/__empty_unbound_inputs__", + port_name="__empty_unbound_inputs__", + port=upstream_port, + output_directory=output_directory, + value=None, + ) def _recursive_translate( self, diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index e675b7284..7358d3c89 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -143,10 +143,15 @@ async def _persist_token( ) -> Token: if token.persistent_id: raise WorkflowDefinitionException( - f"Token already has an id: {token.persistent_id}" + f"Step {self.name} failed to save token: Token already has a persistent id: {token.persistent_id}" ) await token.save(self.workflow.context, port_id=port.persistent_id) if input_token_ids: + if any(id_ is None for id_ in input_token_ids): + raise WorkflowExecutionException( + f"Step {self.name} cannot establish provenance: " + f"One or more input tokens have not been persisted: {input_token_ids}" + ) await self.workflow.context.database.add_provenance( inputs=input_token_ids, token=token.persistent_id ) diff --git a/streamflow/workflow/transformer.py b/streamflow/workflow/transformer.py index 8fbfaf988..72936fb52 100644 --- a/streamflow/workflow/transformer.py +++ b/streamflow/workflow/transformer.py @@ -11,7 +11,7 @@ def add_output_port(self, name: str, port: Port) -> None: super().add_output_port(name, port) else: raise WorkflowDefinitionException( - f"{self.name} step must contain a single output port." + f"Step {self.name} must contain a single output port." ) def get_output_name(self) -> str: @@ -20,7 +20,24 @@ def get_output_name(self) -> str: async def run(self) -> None: if len(self.output_ports) != 1: raise WorkflowDefinitionException( - f"{self.name} step must contain a single output port." + f"Step {self.name} must contain a single output port." + ) + await super().run() + + +class OneToManyTransformer(Transformer, ABC): + def add_input_port(self, name: str, port: Port) -> None: + if not self.input_ports: + super().add_input_port(name, port) + else: + raise WorkflowDefinitionException( + f"Step {self.name} must contain a single input port." + ) + + async def run(self) -> None: + if len(self.input_ports) != 1: + raise WorkflowDefinitionException( + f"Step {self.name} must contain a single input port." ) await super().run() @@ -31,16 +48,16 @@ def add_input_port(self, name: str, port: Port) -> None: super().add_input_port(name, port) else: raise WorkflowDefinitionException( - f"{self.name} step must contain a single input port." + f"Step {self.name} must contain a single input port." ) async def run(self) -> None: if len(self.input_ports) != 1: raise WorkflowDefinitionException( - f"{self.name} step must contain a single input port." + f"Step {self.name} must contain a single input port." ) if len(self.output_ports) != 1: raise WorkflowDefinitionException( - f"{self.name} step must contain a single output port." + f"Step {self.name} must contain a single output port." ) await super().run()