From 7820092324849b9051c74da08bba2b8cae10711a Mon Sep 17 00:00:00 2001 From: jucordero Date: Mon, 20 Apr 2026 19:55:54 +0100 Subject: [PATCH 1/7] pipe node decorator --- agrifoodpy/pipeline/pipeline.py | 49 ++++++++++++- agrifoodpy/pipeline/tests/test_pipeline.py | 84 +++++++++++++++++++++- 2 files changed, 130 insertions(+), 3 deletions(-) diff --git a/agrifoodpy/pipeline/pipeline.py b/agrifoodpy/pipeline/pipeline.py index 9104a61..8966254 100644 --- a/agrifoodpy/pipeline/pipeline.py +++ b/agrifoodpy/pipeline/pipeline.py @@ -10,7 +10,7 @@ import time import yaml import importlib - +from ..utils.dict_utils import get_dict class Pipeline(): '''Class for constructing and running pipelines of functions with @@ -293,3 +293,50 @@ def wrapper(*args, **kwargs): return result return wrapper return pipeline_decorator + + +def pipeline_node(input_keys): + """ Decorator to make a function compatible with pipeline execution + + If a datablock is passed as a kwarg, the function will be executed in + pipeline mode, and the datasets associated with the arguments in + input_keys will be extracted from the datablock and passed to the function. + Unregistered keyword arguments will be passed directly to the function. + + Parameters + ---------- + input_keys: list of strings + List of dataset keys to be extracted from the datablock and passed to + the decorated function. + + Returns + ------- + wrapper: function + The decorated function + """ + def pipeline_decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + + # Identify positional arguments + func_sig = signature(func) + func_params = func_sig.parameters + + kwargs.update({key: arg for key, arg in zip(func_params.keys(), args)}) + + # Check whether the function is being called in a pipeline or not + datablock = kwargs.pop("datablock", None) + return_key = kwargs.pop("return_key", func.__name__) + + if datablock is None: + return func(**kwargs) + + else: + for key in input_keys: + kwargs[key] = get_dict(datablock, kwargs[key]) + result = func(**kwargs) + + datablock[return_key] = result + return datablock + return wrapper + return pipeline_decorator \ No newline at end of file diff --git a/agrifoodpy/pipeline/tests/test_pipeline.py b/agrifoodpy/pipeline/tests/test_pipeline.py index 4e05d49..4cb9cd9 100644 --- a/agrifoodpy/pipeline/tests/test_pipeline.py +++ b/agrifoodpy/pipeline/tests/test_pipeline.py @@ -1,4 +1,4 @@ -from agrifoodpy.pipeline.pipeline import Pipeline, standalone +from agrifoodpy.pipeline import Pipeline, standalone def test_init(): pipeline = Pipeline() @@ -202,4 +202,84 @@ def pipeline_decorated(x, out_key, datablock=None): pipeline.add_node(pipeline_decorated, params={'x': 'x', 'out_key': 'result'}) pipeline.run() - assert pipeline.datablock['result'] == 15 \ No newline at end of file + assert pipeline.datablock['result'] == 15 + +def test_pipeline_node_decorator(): + + from agrifoodpy.pipeline.pipeline import Pipeline, pipeline_node + + test_datablock_single = {'value1': 5, 'value2': 10} + test_pipeline_single = Pipeline(test_datablock_single) + + # Test decorated function with single input key and no return key + @pipeline_node(['x']) + def double_numbers(x): + return x * 2 + + test_pipeline_single.add_node( + double_numbers, + params={'x': 'value1'} + ) + + test_pipeline_single.run() + assert double_numbers(test_datablock_single['value1']) == 10 + assert double_numbers.__name__ in test_pipeline_single.datablock + assert test_pipeline_single.datablock[double_numbers.__name__] == 10 + + # Test decorated function with multiple input keys and no return key + test_datablock_multiple = {'value1': 5, 'value2': 10} + test_pipeline_multiple = Pipeline(test_datablock_multiple) + + @pipeline_node(['x', 'y']) + def sum_numbers(x, y): + return x + y + + test_pipeline_multiple.add_node( + sum_numbers, + params={'x': 'value1', 'y': 'value2'} + ) + + test_pipeline_multiple.run() + assert sum_numbers( + test_datablock_multiple['value1'], + test_datablock_multiple['value2']) == 15 + assert sum_numbers.__name__ in test_pipeline_multiple.datablock + assert test_pipeline_multiple.datablock[sum_numbers.__name__] == 15 + + # Test decorated function with multiple input keys and return key + test_datablock_with_return = {'value1': 5, 'value2': 10} + test_pipeline_with_return = Pipeline(test_datablock_with_return) + return_key = "result" + + @pipeline_node(['x', 'y']) + def subtract_numbers(x, y): + return x - y + + test_pipeline_with_return.add_node( + subtract_numbers, + params={'x': 'value1', 'y': 'value2', "return_key": return_key} + ) + + test_pipeline_with_return.run() + assert subtract_numbers( + test_datablock_with_return['value1'], + test_datablock_with_return['value2']) == -5 + assert return_key in test_pipeline_with_return.datablock + assert test_pipeline_with_return.datablock[return_key] == -5 + + #test decorated function with external function + + test_datablock_external = {'value1': [1, 2, 3]} + test_pipeline_external = Pipeline(test_datablock_external) + + import numpy as np + + test_pipeline_external.add_node( + pipeline_node(input_keys=["a"])(np.mean), + params={'a': 'value1', 'return_key': "mean_result"} + ) + + test_pipeline_external.run() + assert np.mean(test_datablock_external['value1']) == 2 + assert "mean_result" in test_pipeline_external.datablock + assert test_pipeline_external.datablock["mean_result"] == 2 \ No newline at end of file From 81c8daf748893924a1450ef52c949311680aa77d Mon Sep 17 00:00:00 2001 From: jucordero Date: Mon, 20 Apr 2026 21:18:10 +0100 Subject: [PATCH 2/7] fixed __init__ imports --- agrifoodpy/pipeline/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/agrifoodpy/pipeline/__init__.py b/agrifoodpy/pipeline/__init__.py index 26952e7..8982719 100644 --- a/agrifoodpy/pipeline/__init__.py +++ b/agrifoodpy/pipeline/__init__.py @@ -2,5 +2,4 @@ This module provides methods to build a pipeline for the AgriFoodPy package. """ -from .pipeline import * -from ..utils.dict_utils import * \ No newline at end of file +from .pipeline import * \ No newline at end of file From ea76f579d6a1c95afd99668d1df5c34a63fb5128 Mon Sep 17 00:00:00 2001 From: jucordero Date: Tue, 21 Apr 2026 14:53:31 +0100 Subject: [PATCH 3/7] dict returns and docs --- agrifoodpy/pipeline/pipeline.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/agrifoodpy/pipeline/pipeline.py b/agrifoodpy/pipeline/pipeline.py index 8966254..5876b4d 100644 --- a/agrifoodpy/pipeline/pipeline.py +++ b/agrifoodpy/pipeline/pipeline.py @@ -10,7 +10,7 @@ import time import yaml import importlib -from ..utils.dict_utils import get_dict +from ..utils.dict_utils import get_dict, set_dict class Pipeline(): '''Class for constructing and running pipelines of functions with @@ -299,9 +299,12 @@ def pipeline_node(input_keys): """ Decorator to make a function compatible with pipeline execution If a datablock is passed as a kwarg, the function will be executed in - pipeline mode, and the datasets associated with the arguments in + pipeline mode, and the objects associated with the arguments in input_keys will be extracted from the datablock and passed to the function. Unregistered keyword arguments will be passed directly to the function. + Decorated function take a "return_key" kwarg to specify the key under which + the function output will be stored in the datablock. If not provided, the + function name will be used as the return key. Parameters ---------- @@ -313,7 +316,7 @@ def pipeline_node(input_keys): ------- wrapper: function The decorated function - """ + """ def pipeline_decorator(func): @wraps(func) def wrapper(*args, **kwargs): @@ -336,7 +339,8 @@ def wrapper(*args, **kwargs): kwargs[key] = get_dict(datablock, kwargs[key]) result = func(**kwargs) - datablock[return_key] = result + set_dict(datablock, return_key, result) + return datablock return wrapper return pipeline_decorator \ No newline at end of file From 0b2382ee33bb3136ff246c4ac22df1682b361208 Mon Sep 17 00:00:00 2001 From: jucordero Date: Wed, 22 Apr 2026 13:03:25 +0100 Subject: [PATCH 4/7] error conditions, updated tests --- agrifoodpy/pipeline/pipeline.py | 43 ++++++++++++++++------ agrifoodpy/pipeline/tests/test_pipeline.py | 22 ++++++++++- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/agrifoodpy/pipeline/pipeline.py b/agrifoodpy/pipeline/pipeline.py index 5876b4d..ab5ee4b 100644 --- a/agrifoodpy/pipeline/pipeline.py +++ b/agrifoodpy/pipeline/pipeline.py @@ -80,7 +80,7 @@ def datablock_write(self, path, value): current = current.setdefault(key, {}) current[path[-1]] = value - def add_node(self, node, params={}, name=None, index=None): + def add_node(self, node, params=None, name=None, index=None): """Adds a node to the pipeline, including its function and execution parameters. @@ -99,7 +99,7 @@ def add_node(self, node, params={}, name=None, index=None): """ # Copy the parameters to avoid modifying the original dictionaries - params = copy.deepcopy(params) + params = copy.deepcopy(params) if params is not None else {} if name is None: name = "Node {}".format(len(self.nodes) + 1) @@ -318,26 +318,45 @@ def pipeline_node(input_keys): The decorated function """ def pipeline_decorator(func): + reserved = {"datablock", "return_key"} + if reserved & set(signature(func).parameters): + raise ValueError(f"Function {func.__name__} has reserved parameter" + f" names {reserved & set(signature(func).parameters)}." + "Please rename these parameters to use the" + "pipeline_node decorator.") + + func_params = signature(func).parameters + unknown = set(input_keys) - set(func_params.keys()) + if unknown: + raise ValueError(f"input_keys {unknown} not found in parameters " + f"of '{func.__name__}'") + @wraps(func) def wrapper(*args, **kwargs): - - # Identify positional arguments - func_sig = signature(func) - func_params = func_sig.parameters - kwargs.update({key: arg for key, arg in zip(func_params.keys(), args)}) - - # Check whether the function is being called in a pipeline or not + # Pop wrapper-specific kwargs datablock = kwargs.pop("datablock", None) return_key = kwargs.pop("return_key", func.__name__) + # Bind positional and keyword args to their parameter names + func_sig = signature(func) + try: + bound = func_sig.bind(*args, **kwargs) + except TypeError: + raise KeyError(f"Missing required argument for function" + f"{func.__name__}.") + + bound.apply_defaults() + if datablock is None: - return func(**kwargs) + return func(*bound.args, **bound.kwargs) else: + pipeline_kwargs = dict(bound.arguments) for key in input_keys: - kwargs[key] = get_dict(datablock, kwargs[key]) - result = func(**kwargs) + pipeline_kwargs[key] = get_dict(datablock, + pipeline_kwargs[key]) + result = func(**pipeline_kwargs) set_dict(datablock, return_key, result) diff --git a/agrifoodpy/pipeline/tests/test_pipeline.py b/agrifoodpy/pipeline/tests/test_pipeline.py index 4cb9cd9..885ebed 100644 --- a/agrifoodpy/pipeline/tests/test_pipeline.py +++ b/agrifoodpy/pipeline/tests/test_pipeline.py @@ -282,4 +282,24 @@ def subtract_numbers(x, y): test_pipeline_external.run() assert np.mean(test_datablock_external['value1']) == 2 assert "mean_result" in test_pipeline_external.datablock - assert test_pipeline_external.datablock["mean_result"] == 2 \ No newline at end of file + assert test_pipeline_external.datablock["mean_result"] == 2 + + # Test decorated function with reserved parameter names + try: + @pipeline_node(['x']) + def reserved_param_node(x, datablock=None): + pass + + except ValueError as e: + assert "reserved parameter names" in str(e) + assert "datablock" in str(e) + + + # Test decorated function with unknown input keys + try: + @pipeline_node(['wrong_key']) + def unknown_input_node(right_key): + pass + + except ValueError as e: + assert "input_keys {'wrong_key'} not found in parameters" in str(e) \ No newline at end of file From 38cdb0e805655f67315be811ded3f604705b63f3 Mon Sep 17 00:00:00 2001 From: Juan Pablo Cordero <34517350+jucordero@users.noreply.github.com> Date: Wed, 22 Apr 2026 14:20:33 +0100 Subject: [PATCH 5/7] Documentation update Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- agrifoodpy/pipeline/pipeline.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/agrifoodpy/pipeline/pipeline.py b/agrifoodpy/pipeline/pipeline.py index ab5ee4b..381dbc3 100644 --- a/agrifoodpy/pipeline/pipeline.py +++ b/agrifoodpy/pipeline/pipeline.py @@ -299,18 +299,19 @@ def pipeline_node(input_keys): """ Decorator to make a function compatible with pipeline execution If a datablock is passed as a kwarg, the function will be executed in - pipeline mode, and the objects associated with the arguments in - input_keys will be extracted from the datablock and passed to the function. - Unregistered keyword arguments will be passed directly to the function. - Decorated function take a "return_key" kwarg to specify the key under which - the function output will be stored in the datablock. If not provided, the + pipeline mode, and the values of the parameters named in input_keys will + be interpreted as datablock lookup keys. The corresponding objects will be + extracted from the datablock and passed to the function. Unregistered + keyword arguments will be passed directly to the function. The decorated + function takes a "return_key" kwarg to specify the key under which the + function output will be stored in the datablock. If not provided, the function name will be used as the return key. Parameters ---------- input_keys: list of strings - List of dataset keys to be extracted from the datablock and passed to - the decorated function. + List of decorated function parameter names whose values will be used as + datablock lookup keys in pipeline mode. Returns ------- From f9288f9412858a3f183b16456a4f2b2131a9f9e0 Mon Sep 17 00:00:00 2001 From: Juan Pablo Cordero <34517350+jucordero@users.noreply.github.com> Date: Wed, 22 Apr 2026 14:24:09 +0100 Subject: [PATCH 6/7] Update exception error type Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- agrifoodpy/pipeline/pipeline.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/agrifoodpy/pipeline/pipeline.py b/agrifoodpy/pipeline/pipeline.py index 381dbc3..9ec1996 100644 --- a/agrifoodpy/pipeline/pipeline.py +++ b/agrifoodpy/pipeline/pipeline.py @@ -343,9 +343,10 @@ def wrapper(*args, **kwargs): func_sig = signature(func) try: bound = func_sig.bind(*args, **kwargs) - except TypeError: - raise KeyError(f"Missing required argument for function" - f"{func.__name__}.") + except TypeError as e: + raise TypeError( + f"Invalid arguments for function {func.__name__}." + ) from e bound.apply_defaults() From 8b14ed6822b3f46d04057140a45aecb1d6d9e44c Mon Sep 17 00:00:00 2001 From: jucordero Date: Wed, 22 Apr 2026 15:03:07 +0100 Subject: [PATCH 7/7] single key functions, new tests --- agrifoodpy/pipeline/pipeline.py | 18 +++++--- agrifoodpy/pipeline/tests/test_pipeline.py | 53 ++++++++++++++++------ 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/agrifoodpy/pipeline/pipeline.py b/agrifoodpy/pipeline/pipeline.py index 9ec1996..08fda2f 100644 --- a/agrifoodpy/pipeline/pipeline.py +++ b/agrifoodpy/pipeline/pipeline.py @@ -295,7 +295,7 @@ def wrapper(*args, **kwargs): return pipeline_decorator -def pipeline_node(input_keys): +def pipeline_node(input_keys=None): """ Decorator to make a function compatible with pipeline execution If a datablock is passed as a kwarg, the function will be executed in @@ -309,7 +309,7 @@ def pipeline_node(input_keys): Parameters ---------- - input_keys: list of strings + input_keys: string or list of strings, optional List of decorated function parameter names whose values will be used as datablock lookup keys in pipeline mode. @@ -318,6 +318,13 @@ def pipeline_node(input_keys): wrapper: function The decorated function """ + + if input_keys is not None: + if isinstance(input_keys, str): + input_keys = [input_keys] + else: + input_keys = [] + def pipeline_decorator(func): reserved = {"datablock", "return_key"} if reserved & set(signature(func).parameters): @@ -354,11 +361,10 @@ def wrapper(*args, **kwargs): return func(*bound.args, **bound.kwargs) else: - pipeline_kwargs = dict(bound.arguments) for key in input_keys: - pipeline_kwargs[key] = get_dict(datablock, - pipeline_kwargs[key]) - result = func(**pipeline_kwargs) + bound.arguments[key] = get_dict(datablock, + bound.arguments[key]) + result = func(*bound.args, **bound.kwargs) set_dict(datablock, return_key, result) diff --git a/agrifoodpy/pipeline/tests/test_pipeline.py b/agrifoodpy/pipeline/tests/test_pipeline.py index 885ebed..8170a81 100644 --- a/agrifoodpy/pipeline/tests/test_pipeline.py +++ b/agrifoodpy/pipeline/tests/test_pipeline.py @@ -1,4 +1,5 @@ from agrifoodpy.pipeline import Pipeline, standalone +import pytest def test_init(): pipeline = Pipeline() @@ -212,7 +213,7 @@ def test_pipeline_node_decorator(): test_pipeline_single = Pipeline(test_datablock_single) # Test decorated function with single input key and no return key - @pipeline_node(['x']) + @pipeline_node('x') def double_numbers(x): return x * 2 @@ -226,6 +227,23 @@ def double_numbers(x): assert double_numbers.__name__ in test_pipeline_single.datablock assert test_pipeline_single.datablock[double_numbers.__name__] == 10 + # Test decorated function with single input key and unregistered key + @pipeline_node('value') + def scale_numbers(value, factor=3): + return value * factor + + test_datablock_mixed = {'value': 5} + + test_pipeline_mixed = Pipeline(test_datablock_mixed) + test_pipeline_mixed.add_node( + scale_numbers, + params={'value': 'value', 'factor': 4} + ) + test_pipeline_mixed.run() + assert scale_numbers(test_datablock_mixed['value'], factor=4) == 20 + assert scale_numbers.__name__ in test_pipeline_mixed.datablock + assert test_pipeline_mixed.datablock[scale_numbers.__name__] == 20 + # Test decorated function with multiple input keys and no return key test_datablock_multiple = {'value1': 5, 'value2': 10} test_pipeline_multiple = Pipeline(test_datablock_multiple) @@ -268,14 +286,13 @@ def subtract_numbers(x, y): assert test_pipeline_with_return.datablock[return_key] == -5 #test decorated function with external function - test_datablock_external = {'value1': [1, 2, 3]} test_pipeline_external = Pipeline(test_datablock_external) import numpy as np test_pipeline_external.add_node( - pipeline_node(input_keys=["a"])(np.mean), + pipeline_node(input_keys="a")(np.mean), params={'a': 'value1', 'return_key': "mean_result"} ) @@ -284,22 +301,30 @@ def subtract_numbers(x, y): assert "mean_result" in test_pipeline_external.datablock assert test_pipeline_external.datablock["mean_result"] == 2 + # Test decorated function with no input keys + test_pipeline_no_input = Pipeline() + + @pipeline_node([]) + def return_constant(): + return 42 + + test_pipeline_no_input.add_node( + return_constant + ) + + test_pipeline_no_input.run() + assert return_constant() == 42 + assert return_constant.__name__ in test_pipeline_no_input.datablock + assert test_pipeline_no_input.datablock[return_constant.__name__] == 42 + # Test decorated function with reserved parameter names - try: + with pytest.raises(ValueError, match="reserved parameter names.*datablock"): @pipeline_node(['x']) def reserved_param_node(x, datablock=None): pass - - except ValueError as e: - assert "reserved parameter names" in str(e) - assert "datablock" in str(e) - - # Test decorated function with unknown input keys - try: + # Test decorated function with unknown input keys + with pytest.raises(ValueError, match="input_keys.*not found in parameters"): @pipeline_node(['wrong_key']) def unknown_input_node(right_key): pass - - except ValueError as e: - assert "input_keys {'wrong_key'} not found in parameters" in str(e) \ No newline at end of file