From 21e3d0e4343f532c1df342eef97eeceaccb24402 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Sat, 8 Nov 2025 10:14:18 +0000 Subject: [PATCH 01/15] `create_process` shouldn't be a public method ~ creates confusion [breaking] --- src/tasktronaut/builder.py | 2 +- src/tasktronaut/process.py | 18 ++---------------- tests/test_process.py | 7 ------- 3 files changed, 3 insertions(+), 24 deletions(-) diff --git a/src/tasktronaut/builder.py b/src/tasktronaut/builder.py index 5e58a56..824b067 100644 --- a/src/tasktronaut/builder.py +++ b/src/tasktronaut/builder.py @@ -106,7 +106,7 @@ def sub_process( description: Description = None, ): builder = Builder( - process=definition.create_process( + process=definition.execution_mode.value( identifier=self.process.identifier, definition=definition, ), diff --git a/src/tasktronaut/process.py b/src/tasktronaut/process.py index 39b7ae4..dab8a80 100644 --- a/src/tasktronaut/process.py +++ b/src/tasktronaut/process.py @@ -3,7 +3,7 @@ import uuid from abc import ABC, abstractmethod from contextlib import contextmanager -from typing import Generator, Optional, Type, cast +from typing import Generator, Optional, Type from .backend import Backend, Job from .builder import Builder @@ -123,20 +123,6 @@ class ProcessDefinition(ABC): description: Description = None execution_mode: ExecutionMode = ExecutionMode.SEQUENTIAL - @classmethod - def create_process( - cls, - identifier: str, - definition: Type["ProcessDefinition"], - ) -> Process: - return cast( - Process, - cls.execution_mode.value( - identifier=identifier, - definition=definition, - ), - ) - @classmethod def build( cls, @@ -145,7 +131,7 @@ def build( **kwargs, ) -> Process: builder = Builder( - process=cls.create_process( + process=cls.execution_mode.value( identifier=identifier or uuid.uuid4().hex, definition=cls, ), diff --git a/tests/test_process.py b/tests/test_process.py index 8d5f769..cd10e86 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -37,13 +37,6 @@ def test_repr(self): class ProcessDefinitionTestCase(unittest.TestCase): - def test_create_process(self): - result = mocks.MockDefinition.create_process( - identifier="foo", - definition=mocks.MockDefinition, - ) - self.assertIsNotNone(result) - @mock.patch.object(mocks.MockDefinition, "define_process") def test_build(self, mock_define_process): result = mocks.MockDefinition.build( From 8c83f617117a02a373fb1270d26318c976a8b229 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Sat, 8 Nov 2025 10:07:24 +0000 Subject: [PATCH 02/15] make `define_process` method more prominent --- src/tasktronaut/process.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tasktronaut/process.py b/src/tasktronaut/process.py index dab8a80..d4ddcfa 100644 --- a/src/tasktronaut/process.py +++ b/src/tasktronaut/process.py @@ -123,6 +123,10 @@ class ProcessDefinition(ABC): description: Description = None execution_mode: ExecutionMode = ExecutionMode.SEQUENTIAL + @abstractmethod + def define_process(self, builder: Builder): # pragma: no cover + pass + @classmethod def build( cls, @@ -141,10 +145,6 @@ def build( cls().define_process(builder) return builder.process - @abstractmethod - def define_process(self, builder: Builder): # pragma: no cover - pass - def on_started(self, identifier: str): logger.info("%s started [%s].", self.__class__.__name__, identifier) From 31d4e290fe275a645a2f059ba0b62cc11a8f5077 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Sat, 8 Nov 2025 15:17:03 +0000 Subject: [PATCH 03/15] include `tests` code in `pylint` linting --- Taskfile.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Taskfile.yml b/Taskfile.yml index e07689d..c4bcfc1 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -34,7 +34,7 @@ tasks: desc: Lint Python code cmds: - uv run mypy src - - uv run pylint src + - uv run pylint src tests build: desc: Build Python package From 890d163a3a76087597d2f2dc210bd116afdca5b0 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Sat, 8 Nov 2025 15:18:02 +0000 Subject: [PATCH 04/15] add `context` argument to `around_task` hook method --- src/tasktronaut/backend.py | 1 + src/tasktronaut/process.py | 2 ++ tests/mocks.py | 5 ++++- tests/test_process.py | 4 +++- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/tasktronaut/backend.py b/src/tasktronaut/backend.py index f883690..5677d3a 100644 --- a/src/tasktronaut/backend.py +++ b/src/tasktronaut/backend.py @@ -71,6 +71,7 @@ def perform_task( identifier=identifier, step=function_name, description=description, + context=context, ): task_func(context=context, **kwargs) diff --git a/src/tasktronaut/process.py b/src/tasktronaut/process.py index d4ddcfa..15144c6 100644 --- a/src/tasktronaut/process.py +++ b/src/tasktronaut/process.py @@ -7,6 +7,7 @@ from .backend import Backend, Job from .builder import Builder +from .context import Context from .steps import Steps from .types import Description, Options from .utils import to_dict @@ -154,6 +155,7 @@ def around_task( identifier: str, step: str, description: Description, + context: Context, # pylint: disable=unused-argument ) -> Generator[None, None, None]: logger.info( "Executing '%s' (%s) of %s process [%s].", diff --git a/tests/mocks.py b/tests/mocks.py index 0eaa041..924f4c1 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -1,9 +1,10 @@ from contextlib import contextmanager from typing import Any, Dict, Generator, List, Optional, Union -from tasktronaut import Builder, ProcessDefinition, task +from tasktronaut import Builder, Context, ProcessDefinition, task from tasktronaut.backend import Backend, Job from tasktronaut.errors import CancelProcessError, NonRetryableProcessError + from tasktronaut.types import Description @@ -81,11 +82,13 @@ def around_task( identifier: str, step: str, description: Description, + context: Context, ) -> Generator[None, None, None]: with super().around_task( identifier=identifier, step=step, description=description, + context=context, ): yield diff --git a/tests/test_process.py b/tests/test_process.py index cd10e86..b72734a 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -3,10 +3,11 @@ from typing import Optional, Type from unittest import mock -from tasktronaut import Backend +from tasktronaut import Backend, Context from tasktronaut.backend import Job from tasktronaut.process import ConcurrentProcess, Process, SequentialProcess from tasktronaut.steps import Step + from . import mocks @@ -57,6 +58,7 @@ def test_around_task(self): identifier="foo", step="bar", description="baz", + context=Context(), ): called = True From 3404c02b210d8fc07890090e5bab5494b020b4a3 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Sat, 8 Nov 2025 15:18:29 +0000 Subject: [PATCH 05/15] fix linting errors --- src/tasktronaut/builder.py | 2 +- tests/smoke_test_rq.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/tasktronaut/builder.py b/src/tasktronaut/builder.py index 824b067..a21dc04 100644 --- a/src/tasktronaut/builder.py +++ b/src/tasktronaut/builder.py @@ -30,7 +30,7 @@ def __init__( ): self.process = process self.description = description - self.options = self._convert_options(options) + self.options: SimpleNamespace = self._convert_options(options) self.kwargs = kwargs @staticmethod diff --git a/tests/smoke_test_rq.py b/tests/smoke_test_rq.py index 6834a9e..f509fa6 100644 --- a/tests/smoke_test_rq.py +++ b/tests/smoke_test_rq.py @@ -9,12 +9,12 @@ from tasktronaut.backends.rq import RqBackend from tasktronaut.utils import to_kwargs -log_file = "smoke_test_rq.log" +LOG_FILE = "smoke_test_rq.log" # ensure log file cleared -pathlib.Path(log_file).unlink(missing_ok=True) +pathlib.Path(LOG_FILE).unlink(missing_ok=True) -file_handler = logging.FileHandler(log_file) +file_handler = logging.FileHandler(LOG_FILE) logger = logging.getLogger(__name__) logger.propagate = False logger.setLevel(logging.DEBUG) @@ -62,7 +62,7 @@ def main(): assert job.is_finished, "ERROR: Process never finished." - with open(log_file) as f: + with open(LOG_FILE, "r", encoding="UTF-8") as f: logs = f.read() print(logs) From aec0cb006d069042846b10501c8a050a4a59670b Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 16:38:07 +0000 Subject: [PATCH 06/15] fix member name resolution in API documentation --- docs/source/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index efdd54d..04f7683 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -21,7 +21,7 @@ templates_path = ["_templates"] exclude_patterns = [] - +add_module_names = True # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output From ce302949387322411b81f9da479ff8bd1807b004 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Sat, 8 Nov 2025 10:14:49 +0000 Subject: [PATCH 07/15] add documentation thanks Claude for helping! --- src/tasktronaut/__init__.py | 11 ++ src/tasktronaut/backend.py | 171 ++++++++++++++++++++- src/tasktronaut/backends/rq.py | 113 ++++++++++++++ src/tasktronaut/builder.py | 234 ++++++++++++++++++++++++++++ src/tasktronaut/context.py | 17 ++- src/tasktronaut/decorators.py | 42 +++++ src/tasktronaut/errors.py | 29 +++- src/tasktronaut/process.py | 270 ++++++++++++++++++++++++++++++++- src/tasktronaut/steps.py | 55 ++++++- src/tasktronaut/types.py | 120 ++++++++++++++- src/tasktronaut/utils.py | 49 ++++++ 11 files changed, 1099 insertions(+), 12 deletions(-) diff --git a/src/tasktronaut/__init__.py b/src/tasktronaut/__init__.py index f8e3cf4..789fefc 100644 --- a/src/tasktronaut/__init__.py +++ b/src/tasktronaut/__init__.py @@ -1,3 +1,14 @@ +""" +Tasktronaut: A Python library for defining and executing process workflows. + +The tasktronaut package provides a framework for building complex process definitions +with support for sequential and concurrent execution, sub-processes, conditional logic, +and iterative patterns. + +This module serves as the main entry point, exposing the primary public API for +the library. +""" + from .backend import Backend from .builder import Builder from .context import Context diff --git a/src/tasktronaut/backend.py b/src/tasktronaut/backend.py index 5677d3a..0a894b0 100644 --- a/src/tasktronaut/backend.py +++ b/src/tasktronaut/backend.py @@ -1,3 +1,7 @@ +""" +Backend module. +""" + import logging from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Protocol, Union @@ -11,15 +15,44 @@ class Job(Protocol): + """ + Protocol defining the interface for a job object. + + A job represents an enqueued unit of work in the backend system and provides + methods to manage its lifecycle. + """ + def cancel(self): # pragma: no cover - pass + """ + Cancel the execution of this job. + + Stops the job from being executed if it hasn't already started. + """ def delete_dependents(self): # pragma: no cover - pass + """ + Delete all jobs that depend on this job. + + Removes any downstream jobs that were scheduled to run after completion + of this job. + """ # pylint: disable=too-many-arguments,too-many-positional-arguments class Backend(ABC): + """ + Abstract base class defining the interface for process execution backends. + + Backends are responsible for enqueueing tasks, managing job dependencies, and + executing process lifecycle callbacks. Subclasses must implement the abstract + enqueue methods to integrate with specific job queue systems. + + The backend handles three main phases of process execution: + - Process start callbacks + - Individual task execution with error handling + - Process completion callbacks + """ + @abstractmethod def enqueue_perform_start( self, @@ -28,7 +61,25 @@ def enqueue_perform_start( definition_class: str, depends_on: Optional[Job] = None, ) -> Job: # pragma: no cover - pass + """ + Enqueue a process start callback. + + Schedules the execution of the process start callback, which allows the + process definition to perform initialization logic when execution begins. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param depends_on: + Optional job that this job depends on. If provided, + this job will not execute until the dependency completes. + :type depends_on: Optional[Job] + :return: Job object representing the enqueued task + :rtype: Job + """ @staticmethod def perform_start( @@ -36,6 +87,23 @@ def perform_start( module_name: str, definition_class: str, ): + """ + Execute the process start callback. + + Loads the process definition and invokes its start callback. This method + is typically called by the backend after the job has been dequeued. + + It is implemented as a staticmethod, to allow for the most flexibility in + backend implementations. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + """ + definition = load_definition(module_name, definition_class) definition.on_started(identifier) @@ -50,7 +118,32 @@ def enqueue_perform_task( kwargs: Dict[str, Any], depends_on: Optional[Job] = None, ) -> Job: # pragma: no cover - pass + """ + Enqueue a process task for execution. + + Schedules the execution of a specific task function within a process definition. + The task will be executed with the provided keyword arguments and can optionally + depend on another job. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param function_name: Name of the task method to execute + :type function_name: str + :param description: Human-readable description of the task + :type description: Description + :param kwargs: Keyword arguments to pass to the task function + :type kwargs: Dict[str, Any] + :param depends_on: + Optional job that this job depends on. If provided, + this job will not execute until the dependency completes. + :type depends_on: Optional[Job] + :return: Job object representing the enqueued task + :rtype: Job + """ @classmethod def perform_task( @@ -63,6 +156,38 @@ def perform_task( description: Description, kwargs: Dict[str, Any], ): + """ + Execute a process task with comprehensive error handling. + + Loads the process definition, creates an execution context, and invokes the + specified task function. Handles three categories of errors: + + - NonRetryableProcessError: Logs error, calls on_failed callback, and re-raises + - CancelProcessError: Cancels the job and its dependents, calls on_cancelled callback + - Other exceptions: Logs error, calls on_failed callback, and re-raises for retry + + The task execution is wrapped with the definition's around_task context manager, + allowing the definition to perform setup and teardown logic. + + :param job: Job object that can be cancelled or have dependents deleted + :type job: Job + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param function_name: Name of the task method to execute + :type function_name: str + :param description: Human-readable description of the task + :type description: Description + :param kwargs: Keyword arguments to pass to the task function + :type kwargs: Dict[str, Any] + + :raises NonRetryableProcessError: When task execution raises a non-retryable error + :raises Exception: When task execution raises an unhandled error (retryable) + """ + definition = load_definition(module_name, definition_class) task_func = getattr(definition, function_name) context = Context() @@ -118,7 +243,25 @@ def enqueue_perform_complete( definition_class: str, depends_on: Optional[Union[Job, List[Job]]] = None, ) -> Job: # pragma: no cover - pass + """ + Enqueue a process completion callback. + + Schedules the execution of the process completion callback, which allows the + process definition to perform finalization logic when all tasks have completed. + Can depend on one or more jobs. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param depends_on: Optional job or list of jobs that this job depends on. + If provided, this job will not execute until all dependencies complete. + :type depends_on: Optional[Union[Job, List[Job]]] + :return: Job object representing the enqueued task + :rtype: Job + """ @staticmethod def perform_complete( @@ -126,5 +269,23 @@ def perform_complete( module_name: str, definition_class: str, ): + """ + Execute the process completion callback. + + Loads the process definition and invokes its completion callback. This method + is typically called by the backend after all process tasks have completed + successfully. + + It is implemented as a staticmethod, to allow for the most flexibility in + backend implementations. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + """ + definition = load_definition(module_name, definition_class) definition.on_completed(identifier) diff --git a/src/tasktronaut/backends/rq.py b/src/tasktronaut/backends/rq.py index 589fe80..5044fe4 100644 --- a/src/tasktronaut/backends/rq.py +++ b/src/tasktronaut/backends/rq.py @@ -1,3 +1,12 @@ +""" +RQ Backend implementation for Tasktronaut process execution. + +This module provides a concrete implementation of the Backend abstract base class +using the RQ (Redis Queue) job queue library. It enables asynchronous execution +of Tasktronaut processes with support for job dependencies and Redis-backed +job persistence. +""" + from typing import Any, Dict, List, Optional, Union, cast import rq @@ -8,7 +17,24 @@ # pylint: disable=too-many-arguments,too-many-positional-arguments class RqBackend(Backend): + """ + Backend implementation using RQ (Redis Queue) for process execution. + + This backend enqueues process start, task, and completion callbacks to an RQ queue + for asynchronous execution. It leverages RQ's job dependency features to ensure + proper task ordering and provides Redis-backed persistence of job state. + + :param queue: The RQ Queue instance to which jobs will be enqueued + :type queue: rq.Queue + """ + def __init__(self, queue: rq.Queue): + """ + Initialize the RQ backend with a queue instance. + + :param queue: The RQ Queue instance to which jobs will be enqueued + :type queue: rq.Queue + """ self.queue = queue def enqueue_perform_start( @@ -18,6 +44,25 @@ def enqueue_perform_start( definition_class: str, depends_on: Optional[Job] = None, ) -> Job: + """ + Enqueue a process start callback. + + Schedules the execution of the process start callback, which allows the + process definition to perform initialization logic when execution begins. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param depends_on: + Optional job that this job depends on. If provided, + this job will not execute until the dependency completes. + :type depends_on: Optional[Job] + :return: Job object representing the enqueued task + :rtype: Job + """ return self.queue.enqueue( self.perform_start, identifier=identifier, @@ -36,6 +81,33 @@ def enqueue_perform_task( kwargs: Dict[str, Any], depends_on: Optional[Job] = None, ) -> Job: + """ + Enqueue a process task for execution to the RQ queue. + + Schedules the execution of a specific task function within a process definition + by enqueueing the rq_perform_task method to the configured RQ queue. Uses a + wrapper method to handle RQ-specific parameter handling. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param function_name: Name of the task method to execute + :type function_name: str + :param description: Human-readable description of the task + :type description: Optional[str] + :param kwargs: Keyword arguments to pass to the task function + :type kwargs: Dict[str, Any] + :param depends_on: + Optional job that this job depends on. If provided, + this job will not execute until the dependency completes. + :type depends_on: Optional[Job] + :return: RQ Job object representing the enqueued task + :rtype: Job + """ + # NOTE: `description` and `kwargs` are used by rq return self.queue.enqueue( self.rq_perform_task, @@ -58,6 +130,26 @@ def rq_perform_task( task_description: Description, task_kwargs: Dict[str, Any], ): + """ + Execute a process task from within an RQ job context. + + This classmethod serves as the RQ job entry point for task execution. It retrieves + the current RQ job, and delegates to the perform_task method. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param function_name: Name of the task method to execute + :type function_name: str + :param task_description: Human-readable description of the task + :type task_description: Description + :param task_kwargs: Keyword arguments to pass to the task function + :type task_kwargs: Dict[str, Any] + """ + job = cast(Job, rq.get_current_job()) super().perform_task( @@ -77,6 +169,27 @@ def enqueue_perform_complete( definition_class: str, depends_on: Optional[Union[Job, List[Job]]] = None, ) -> Job: + """ + Enqueue a process completion callback to the RQ queue. + + Schedules the execution of the process completion callback by enqueueing the + perform_complete method to the configured RQ queue. The job will optionally + depend on one or more other jobs before execution. + + :param identifier: Unique identifier for the process execution instance + :type identifier: str + :param module_name: Fully qualified module name containing the process definition + :type module_name: str + :param definition_class: Name of the process definition class to instantiate + :type definition_class: str + :param depends_on: + Optional job or list of jobs that this job depends on. + If provided, this job will not execute until all dependencies complete. + :type depends_on: Optional[Union[Job, List[Job]]] + :return: RQ Job object representing the enqueued task + :rtype: Job + """ + return self.queue.enqueue( self.perform_complete, identifier=identifier, diff --git a/src/tasktronaut/builder.py b/src/tasktronaut/builder.py index a21dc04..d6e1620 100644 --- a/src/tasktronaut/builder.py +++ b/src/tasktronaut/builder.py @@ -1,3 +1,7 @@ +""" +Builder module. +""" + from contextlib import contextmanager from types import SimpleNamespace from typing import Any, Dict, Optional, TYPE_CHECKING, Type, cast @@ -21,6 +25,32 @@ class Builder: + """ + A builder class for constructing and configuring process workflows. + + The Builder provides a fluent interface for defining task execution graphs, + including sequential and concurrent execution patterns, argument validation, + iteration, and transformation capabilities. + + :param process: The process instance being built + :type process: Process + :param kwargs: Keyword arguments to pass to tasks and transformations + :type kwargs: Dict[str, Any] + :param options: Optional configuration options for the process + :type options: Optional[Options] + :param description: Optional description for the builder context + :type description: Description + + :ivar process: The process instance being constructed + :type process: Process + :ivar description: Description for the current builder context + :type description: Description + :ivar options: Configuration options converted to SimpleNamespace + :type options: SimpleNamespace + :ivar kwargs: Keyword arguments for task execution + :type kwargs: Dict[str, Any] + """ + def __init__( self, process: "Process", @@ -35,6 +65,14 @@ def __init__( @staticmethod def _convert_options(options: Optional[Options] = None) -> SimpleNamespace: + """ + Convert options to a SimpleNamespace for attribute-style access. + + :param options: Options as dict, SimpleNamespace, or None + :type options: Optional[Options] + :return: Options as SimpleNamespace + :rtype: SimpleNamespace + """ if options: if isinstance(options, SimpleNamespace): return options @@ -43,6 +81,32 @@ def _convert_options(options: Optional[Options] = None) -> SimpleNamespace: return SimpleNamespace() def expected_arguments(self, **kwargs: Optional[Type]): + """ + Validate that expected arguments are present and of the correct type. + + This method checks that all specified arguments exist in the builder's + kwargs and validates their types using Pydantic type adapters. Type + validation is only performed when a non-None type is specified. + + :param kwargs: + Mapping of argument names to expected types. Use None to skip type + validation for an argument. + :type kwargs: Dict[str, Optional[Type]] + :raises TypeError: If a required argument is not found in kwargs + :raises ValueError: If an argument's type does not match the expected type + + .. note:: + This method does not check for additional arguments beyond those specified. + + Example:: + + builder.expected_arguments( + foo=str, + bar=int, + baz=None, # No type checking + qux=Optional[datetime.date] + ) + """ # NOTE: this doesn't check for additional arguments for name, expected_type in kwargs.items(): if name not in self.kwargs: @@ -60,12 +124,50 @@ def expected_arguments(self, **kwargs: Optional[Type]): @contextmanager def concurrent(self, description: Description = None) -> BuilderIterator: + """ + Create a concurrent execution block where tasks run in parallel. + + This context manager creates a new ConcurrentProcess that executes its + contained tasks concurrently. The resulting process is appended to the + parent process's steps. + + :param description: Optional description for the concurrent block + :type description: Description + :return: Builder instance for the concurrent process + :rtype: BuilderIterator + :yield: Builder configured for concurrent execution + + Example:: + + with builder.concurrent() as b: + b.task(task1) + b.task(task2) + """ from .process import ConcurrentProcess # pylint: disable=import-outside-toplevel yield from self._build_process(ConcurrentProcess, description) @contextmanager def sequential(self, description: Description = None) -> BuilderIterator: + """ + Create a sequential execution block where tasks run one after another. + + This context manager creates a new SequentialProcess that executes its + contained tasks sequentially. The resulting process is appended to the + parent process's steps. + + :param description: Optional description for the sequential block + :type description: Description + :return: Builder instance for the sequential process + :rtype: BuilderIterator + :yield: Builder configured for sequential execution + + Example:: + + with builder.sequential() as b: + b.task(task1) + b.task(task2) + """ from .process import SequentialProcess # pylint: disable=import-outside-toplevel yield from self._build_process(SequentialProcess, description) @@ -73,6 +175,20 @@ def sequential(self, description: Description = None) -> BuilderIterator: def _build_process( self, process_type: Type["Process"], description: Description = None ) -> BuilderIterator: + """ + Internal method to create a sub-process of a specific type. + + Creates a new builder with a process of the specified type, yields it + for configuration, then appends the configured process to the parent's steps. + + :param process_type: The type of process to create (Sequential or Concurrent) + :type process_type: Type[Process] + :param description: Optional description for the process + :type description: Description + :return: Builder instance for the new process + :rtype: BuilderIterator + :yield: Builder configured with the new process + """ builder = Builder( process=process_type( identifier=self.process.identifier, @@ -90,6 +206,27 @@ def task( func: TaskMethod, description: Description = None, ): + """ + Add a task to the process. + + Creates a Step wrapping the provided function and appends it to the + process's steps. The task inherits the builder's kwargs and description + unless overridden. + + :param func: + The function to execute as a task. Can be a regular method, a method + accepting a Context parameter, or a ``@task`` decorated method. + :type func: TaskMethod + :param description: + Optional description for the task. If not provided, uses the function's + 'description' attribute or the builder's description. + :type description: Description + + Example:: + + builder.task(my_task) + builder.task(my_task, description="Custom description") + """ self.process.steps.append( Step( func=func, @@ -105,6 +242,23 @@ def sub_process( definition: ProcessDefinitionType, description: Description = None, ): + """ + Embed a sub-process within the current process. + + Creates and configures a new process based on the provided ``ProcessDefinition``, + then appends it to the parent process's steps. The sub-process inherits + the current options and kwargs. + + :param definition: A ProcessDefinition class defining the sub-process + :type definition: ProcessDefinitionType + :param description: Optional description for the sub-process + :type description: Description + + Example:: + + builder.sub_process(MySubProcess) + builder.sub_process(MySubProcess, description="Data validation") + """ builder = Builder( process=definition.execution_mode.value( identifier=self.process.identifier, @@ -122,6 +276,38 @@ def each( func: ForEachMethod, description: Description = None, ) -> BuilderIterator: + """ + Iterate over items yielded by the given ``func`` method, and yields a + builder for each iteration. + + Executes the provided function to generate items, then yields a Builder + instance for each item with updated kwargs. + + The provided function can yield either a dict of kwargs or a tuple of (kwargs, description). + + :param func: + A function that yields dicts or (dict, str) tuples. The function receives the + current kwargs as parameters. + :type func: ForEachMethod + :param description: Optional default description for iterations + :type description: Description + :return: Iterator of Builder instances, one per yielded item + :rtype: BuilderIterator + :yield: Builder with kwargs updated for each iteration + + Example:: + + def items(self, count: int, **_): + for i in range(count): + yield {"index": i} + + for b in builder.each(self.items): + b.task(task_for_item) + + .. note:: + The provided function can yield either ``dict`` or ``(dict, str)`` tuples, + where the string provides a per-item description. + """ for kwargs in func(**self.kwargs.copy()): kwargs, desc = kwargs if isinstance(kwargs, tuple) else (kwargs, None) yield Builder( @@ -137,6 +323,36 @@ def transform( func: TransformMethod, description: Description = None, ) -> BuilderIterator: + """ + Transform kwargs for tasks within a context block. + + Executes the provided function to generate new kwargs, then yields a Builder + with the transformed kwargs. Tasks defined within the context use the + transformed arguments. The function can return either a dict or a tuple + of (dict, description). + + :param func: + A function that returns a dict of new kwargs or a (dict, str) tuple. The function + receives the current kwargs as parameters. + :type func: TransformMethod + :param description: Optional description for the transformation context + :type description: Description + :return: Builder instance with transformed kwargs + :rtype: BuilderIterator + :yield: Builder configured with transformed kwargs + + Example:: + + def double_value(self, value: int, **_): + return {"value": value * 2} + + with builder.transform(double_value) as t: + t.task(task_with_doubled_value) + + .. note:: + The transform function can return either ``dict`` or ``(dict, str)`` + where the string overrides the description for the context. + """ kwargs = func(**self.kwargs.copy()) kwargs, desc = kwargs if isinstance(kwargs, tuple) else (kwargs, None) @@ -148,4 +364,22 @@ def transform( ) def option(self, name: str, default: Optional[Value] = None) -> Value: + """ + Retrieve an option value with optional default. + + Accesses configuration options passed to the builder, returning the + specified option's value or a default if the option is not set. + + :param name: The name of the option to retrieve + :type name: str + :param default: Default value to return if option is not found + :type default: Optional[Value] + :return: The option's value or the default + :rtype: Value + + Example:: + + if builder.option("debug", False): + builder.task(verbose_logging) + """ return cast(Value, getattr(self.options, name, default)) diff --git a/src/tasktronaut/context.py b/src/tasktronaut/context.py index dfdfddf..12e63a4 100644 --- a/src/tasktronaut/context.py +++ b/src/tasktronaut/context.py @@ -1,2 +1,17 @@ +""" +Context module. +""" + + class Context: # pylint: disable=too-few-public-methods - pass + """ + A context object that carries execution information during process execution. + + The Context class serves as a container for state and metadata that may be + needed by tasks during process execution. Tasks can optionally accept a + Context parameter to access execution-specific information and maintain + state across the process. + + This is a minimal class designed to be extended or used as a base for + storing execution context data passed between tasks in a Tasktronaut process. + """ diff --git a/src/tasktronaut/decorators.py b/src/tasktronaut/decorators.py index 202d42a..5d66c24 100644 --- a/src/tasktronaut/decorators.py +++ b/src/tasktronaut/decorators.py @@ -1,3 +1,7 @@ +""" +Decorators module. +""" + from typing import Callable, Optional, Union, cast, overload from .types import DecoratedTaskMethod, Description, UndecoratedTaskMethod @@ -19,6 +23,44 @@ def task( *, description: Description = None, ) -> Union[Callable[[UndecoratedTaskMethod], DecoratedTaskMethod], DecoratedTaskMethod]: + """ + Decorator for marking methods as tasks within a process definition. + + This decorator can be applied to methods with or without arguments and + optionally assigns a description to the task. When applied, it attaches + metadata to the decorated function that identifies it as a task and stores + any provided description. + + :param func: + The undecorated task method. This parameter is only provided + when the decorator is used without parentheses (bare decorator form). + When None, the decorator is being used with keyword arguments and returns + a decorator function. + :type func: :class:`UndecoratedTaskMethod` or None + :param description: + An optional description of the task's purpose and behavior. + This description may be used for logging, documentation generation, or + user-facing displays of the process flow. + :type description: :class:`Description` or None + :return: + Either the decorated method (when used as a bare decorator) or a + decorator function (when used with keyword arguments). + :rtype: :class:`DecoratedTaskMethod` or :class:`Callable` + + .. note:: + The decorator supports two usage patterns: + + 1. **Bare decorator**: Applied directly without parentheses or with empty + parentheses, where no description is provided. + 2. **Parameterized decorator**: Applied with the ``description`` keyword + argument to specify a task description. + + .. seealso:: + :class:`UndecoratedTaskMethod` -- Type for undecorated task methods + :class:`DecoratedTaskMethod` -- Type for decorated task methods + :class:`Description` -- Type for task descriptions + """ + def _wrapper(f: UndecoratedTaskMethod) -> DecoratedTaskMethod: setattr(f, "description", description) return cast(DecoratedTaskMethod, f) diff --git a/src/tasktronaut/errors.py b/src/tasktronaut/errors.py index 63f04b1..5f9f7ce 100644 --- a/src/tasktronaut/errors.py +++ b/src/tasktronaut/errors.py @@ -1,6 +1,31 @@ +""" +Errors module. +""" + + class NonRetryableProcessError(Exception): # pragma: no cover - pass + """Exception to indicate a process error that should not be retried. + + This exception is used to signal that a process has encountered an error + condition that is permanent and attempting to retry the process would be + futile. When this exception is raised during process execution, the process + will terminate without attempting any configured retry logic. + + :raises NonRetryableProcessError: When a process encounters an unrecoverable error. + """ class CancelProcessError(Exception): # pragma: no cover - pass + """Raised to cancel the execution of a process. + + This exception is used to signal that a process should be cancelled and + terminated immediately. When raised during process execution, it will halt + the current process and any pending tasks without completing the normal + process flow. + + Whilst a ``NonRetryableProcessError` has the same effect; to cancel the + execution of a process, ``CancelProcessError`` causes any enqueued tasks + to be removed from the queue. + + :raises CancelProcessError: When a process needs to be cancelled. + """ diff --git a/src/tasktronaut/process.py b/src/tasktronaut/process.py index 15144c6..b229a73 100644 --- a/src/tasktronaut/process.py +++ b/src/tasktronaut/process.py @@ -1,3 +1,7 @@ +""" +Process module. +""" + import enum import logging import uuid @@ -16,22 +20,82 @@ class Process(ABC): + """Abstract base class for process execution workflows. + + A Process represents an executable workflow composed of multiple steps that can be + enqueued and executed by a backend. Processes are created with an identifier and + a process definition that specifies the workflow structure. + + This is an abstract base class that must be subclassed to implement specific + execution strategies (e.g., sequential, concurrent). + + :param identifier: Unique identifier for this process instance + :type identifier: str + :param definition: The ProcessDefinition class that defines the workflow structure + :type definition: Type[ProcessDefinition] + + :ivar identifier: Unique identifier for this process instance + :type identifier: str + :ivar definition: The ProcessDefinition class associated with this process + :type definition: Type[ProcessDefinition] + :ivar steps: Collection of steps that comprise this process workflow + :type steps: Steps + """ + def __init__(self, identifier: str, definition: Type["ProcessDefinition"]): + """Initialize a new Process instance. + + :param identifier: Unique identifier for this process instance + :type identifier: str + :param definition: The ProcessDefinition class that defines the workflow + :type definition: Type[ProcessDefinition] + """ self.identifier = identifier self.definition: Type[ProcessDefinition] = definition self.steps = Steps() @property def is_process(self) -> bool: + """Check if this object is a Process. + + This property provides a consistent way to identify Process instances + throughout the framework. + + :return: Always returns True + :rtype: bool + """ return True @abstractmethod def enqueue( self, backend: Backend, start_job: Optional[Job] = None ) -> Job: # pragma: no cover - pass + """Enqueue this process for execution on the specified backend. + + This abstract method must be implemented by subclasses to define how the + process and its steps are queued for execution. Different execution modes + (sequential, concurrent) will have different enqueueing strategies. + + :param backend: The execution backend that will run the process + :type backend: Backend + :param start_job: + Optional job to execute before this process begins. + Used for chaining processes together. + :type start_job: Optional[Job] + + :return: A Job representing the enqueued process execution + :rtype: Job + + :raises NotImplementedError: If subclass does not implement this method + + .. note:: + Subclasses must implement this method to define their specific + execution strategy (sequential vs concurrent ordering, dependency + management, etc.) + """ def __str__(self): + """Return a human-readable string representation of the Process.""" return ( f"{self.__class__.__name__}(" f"id={self.identifier}," @@ -41,6 +105,7 @@ def __str__(self): ) def __repr__(self): + """Return a developer-focused string representation of the Process.""" return ( f"{self.__class__.__name__}(" f"id={self.identifier}," @@ -51,7 +116,15 @@ def __repr__(self): class SequentialProcess(Process): + """A process that executes its steps in sequential order. + + SequentialProcess is a concrete implementation of the Process base class that + enqueues steps for execution in a strict sequential manner. Each step must + complete before the next step begins, creating a chain of dependent jobs. + """ + def enqueue(self, backend: Backend, start_job: Optional[Job] = None) -> Job: + """Enqueue this process for execution on the specified backend.""" base_kwargs = to_dict( identifier=self.identifier, module_name=self.definition.__module__, @@ -83,7 +156,16 @@ def enqueue(self, backend: Backend, start_job: Optional[Job] = None) -> Job: class ConcurrentProcess(Process): + """A process that executes its steps concurrently in parallel. + + ConcurrentProcess is a concrete implementation of the Process base class that + enqueues steps for parallel execution. All steps depend only on the initial + start job and can execute simultaneously, with a final completion job that + depends on all step jobs finishing. + """ + def enqueue(self, backend: Backend, start_job: Optional[Job] = None) -> Job: + """Enqueue this process for execution on the specified backend.""" base_kwargs = to_dict( identifier=self.identifier, module_name=self.definition.__module__, @@ -116,17 +198,83 @@ def enqueue(self, backend: Backend, start_job: Optional[Job] = None) -> Job: class ExecutionMode(enum.Enum): + """ + Defines the available execution modes for the process. + + The execution mode determines how tasks within a process are executed. + Sequential execution runs tasks one after another in order, while + concurrent execution allows tasks to run in parallel. + """ + SEQUENTIAL = SequentialProcess + """ + Sequential execution mode. Tasks are executed one at a time in the + order they are defined. Each task completes before the next begins. + This is the default execution mode for processes. + """ + CONCURRENT = ConcurrentProcess + """ + Concurrent execution mode. Tasks are executed in parallel, allowing + multiple tasks to run simultaneously. This mode is suitable for processes + with independent tasks that can benefit from parallel execution. + """ class ProcessDefinition(ABC): + """Base class for defining workflow processes in Tasktronaut. + + This class serves as the foundation for creating custom process definitions. + Subclasses must implement the :meth:`define_process` method to specify the + workflow structure using the provided :class:`Builder` instance. + + :ivar description: Optional description of the process. Defaults to None. + :type description: Optional[Description] + :ivar execution_mode: + The execution mode for the process (SEQUENTIAL or CONCURRENT). + Defaults to SEQUENTIAL. + :type execution_mode: ExecutionMode + + Example:: + + class MyProcess(ProcessDefinition): + execution_mode = ExecutionMode.SEQUENTIAL + + def define_process(self, builder: Builder): + builder.task(self.my_task) + + @task + def my_task(self, context, **kwargs): + print("Executing task") + """ + description: Description = None + """Optional description of the process. Defaults to None.""" + execution_mode: ExecutionMode = ExecutionMode.SEQUENTIAL + """The execution mode for the process (SEQUENTIAL or CONCURRENT).""" @abstractmethod def define_process(self, builder: Builder): # pragma: no cover - pass + """Define the workflow process structure. + + This method must be implemented by subclasses to specify the + tasks and execution flow of the process using the provided builder. + + :param builder: + The builder instance used to construct the process workflow. + :type builder: Builder + :raises NotImplementedError: If not implemented by subclass. + + Example:: + + def define_process(self, builder: Builder): + builder.task(self.first_task) + with builder.concurrent(): + builder.task(self.second_task) + builder.task(self.third_task) + builder.task(self.forth_task) + """ @classmethod def build( @@ -135,6 +283,35 @@ def build( options: Optional[Options] = None, **kwargs, ) -> Process: + """Build and return a Process instance from this definition. + + This class method constructs a complete process by instantiating the + definition class, invoking :meth:`define_process`, and returning the + fully configured process. + + :param identifier: + Unique identifier for the process instance. If not provided, + a random UUID hex string is generated. + :type identifier: Optional[str] + :param options: + Configuration options to pass to the builder for conditional + process logic. + :type options: Optional[Options] + :param kwargs: + Additional keyword arguments passed to tasks during execution. + :type kwargs: Dict[str, Any] + :return: A fully configured process ready for execution. + :rtype: Process + + Example:: + + process = MyProcess.build( + identifier="my-process-001", + options={"debug": True}, + user_id=123 + ) + """ + builder = Builder( process=cls.execution_mode.value( identifier=identifier or uuid.uuid4().hex, @@ -147,6 +324,19 @@ def build( return builder.process def on_started(self, identifier: str): + """Lifecycle hook called when the process starts execution. + + This method is invoked automatically when process execution begins. + Override to implement custom startup logic such as resource initialization + or logging. + + :param identifier: The unique identifier of the process instance. + :type identifier: str + + Note: + The default implementation logs an informational message indicating + the process has started. + """ logger.info("%s started [%s].", self.__class__.__name__, identifier) @contextmanager @@ -157,6 +347,40 @@ def around_task( description: Description, context: Context, # pylint: disable=unused-argument ) -> Generator[None, None, None]: + """Lifecycle hook called around an executing a task. It is a context manager + that wraps individual task execution. + + This method provides a context for task execution, allowing pre- and + post-task actions such as logging, metrics collection, or resource + management. Override to implement custom behavior around task execution. + + :param identifier: The unique identifier of the process instance. + :type identifier: str + :param step: The step identifier within the process. + :type step: str + :param description: Human-readable description of the task being executed. + :type description: Description + :param context: + The context for the invocation of the task. This object can be used to pass + additional metadata to the task if needed. + :type context: Context + + :yield: Control is yielded during task execution. + :rtype: Generator[None, None, None] + + Example:: + + @contextmanager + def around_task(self, identifier, step, description, context): + start_time = time.time() + with super().around_task(context, identifier, step, description): + yield + duration = time.time() - start_time + logger.info(f"Task took {duration:.2f}s") + + Note: + The default implementation logs when task execution begins and completes. + """ logger.info( "Executing '%s' (%s) of %s process [%s].", description, @@ -176,9 +400,38 @@ def around_task( ) def on_completed(self, identifier: str): + """Lifecycle hook called when the process completes successfully. + + This method is invoked automatically when all process tasks complete + without errors. Override to implement custom completion logic such as + cleanup, notifications, or result processing. + + :param identifier: The unique identifier of the process instance. + :type identifier: str + + Note: + The default implementation logs an informational message indicating + the process has completed. + """ logger.info("%s completed [%s].", self.__class__.__name__, identifier) def on_failed(self, identifier: str, step: str, e: Exception): + """Lifecycle hook called when the process fails due to an exception. + + This method is invoked automatically when a task raises an exception + during process execution. Override to implement custom error handling + such as rollback operations, error notifications, or recovery attempts. + + :param identifier: The unique identifier of the process instance. + :type identifier: str + :param step: The step identifier where the failure occurred. + :type step: str + :param e: The exception that caused the failure. + :type e: Exception + + Note: + The default implementation logs an error message with the exception details. + """ logger.error( "%s failed executing %s step [%s].\n%s", self.__class__.__name__, @@ -188,4 +441,17 @@ def on_failed(self, identifier: str, step: str, e: Exception): ) def on_cancelled(self, identifier: str): + """Lifecycle hook called when the process is cancelled. + + This method is invoked automatically when process execution is + deliberately cancelled before completion. Override to implement custom + cancellation logic such as cleanup or state restoration. + + :param identifier: The unique identifier of the process instance. + :type identifier: str + + Note: + The default implementation logs a warning message indicating + the process was cancelled. + """ logger.warning("%s cancelled [%s].", self.__class__.__name__, identifier) diff --git a/src/tasktronaut/steps.py b/src/tasktronaut/steps.py index a4f0e49..314e4e4 100644 --- a/src/tasktronaut/steps.py +++ b/src/tasktronaut/steps.py @@ -1,3 +1,7 @@ +""" +Steps module. +""" + from typing import Any, Dict, TYPE_CHECKING, Union from .types import Description, TaskMethod @@ -7,26 +11,75 @@ class Step: + """ + Represents a single executable step in a process. + + A ``Step`` encapsulates a task function along with its metadata and execution parameters. + + Steps form the building blocks of process definitions and are executed by the process + engine according to the defined execution mode. + + :ivar func: The callable function or method that performs the task + :type func: TaskMethod + :ivar description: Human-readable description of the step + :type description: Description + :ivar kwargs: Keyword arguments to be passed to the function during execution + :type kwargs: Dict[str, Any] + + :param func: The task function to execute + :type func: TaskMethod + :param description: A description of what this step does + :type description: Description + :param kwargs: Additional keyword arguments for the task function + :type kwargs: Dict[str, Any] + """ + def __init__( self, func: TaskMethod, description: Description, kwargs: Dict[str, Any], ): + """ + Initialize a Step instance. + + :param func: The callable function or method to be executed as a task + :type func: TaskMethod + :param description: A description of the step's purpose + :type description: Description + :param kwargs: Keyword arguments to pass to the function + :type kwargs: Dict[str, Any] + """ self.func = func self.description = description self.kwargs = kwargs @property def is_process(self) -> bool: + """ + Determine if this step represents a process. + + :return: Always returns ``False`` since a Step is not a Process + :rtype: bool + """ return False def __str__(self) -> str: + """String representation of the Step.""" return f"{self.__class__.__name__}(func={self.func.__name__},kwargs={self.kwargs!r},)" def __repr__(self) -> str: + """String representation of the Step.""" return f"{self.__class__.__name__}(func={self.func.__name__},kwargs={self.kwargs!r},)" class Steps(list[Union[Step, "Process"]]): # pragma: no cover - pass + """ + A container for holding a collection of steps and sub-processes. + + This class extends the built-in list type to provide a typed container that holds + both Step instances and Process instances. It is used internally to manage the + collection of execution items within a process definition. + + :inherits: list[Union[Step, Process]] + """ diff --git a/src/tasktronaut/types.py b/src/tasktronaut/types.py index 4366f88..59e6834 100644 --- a/src/tasktronaut/types.py +++ b/src/tasktronaut/types.py @@ -1,3 +1,7 @@ +""" +Type definitions and protocols. +""" + from types import SimpleNamespace from typing import ( Any, @@ -20,34 +24,148 @@ from .context import Context UndecoratedTaskMethod: TypeAlias = Callable[..., None] +""" +Type alias for an undecorated task method. + +Represents a callable that accepts any arguments and keyword arguments, +and returns None. These are regular methods that can be registered as +tasks without explicit decoration. + +:type: Callable[..., None] +""" class DecoratedTaskMethod(Protocol): # pylint: disable=too-few-public-methods + """ + Protocol for a task method decorated with the ``@task`` decorator. + + Defines the interface that decorated task methods must implement. Decorated + tasks have metadata attributes and can optionally accept a ``Context`` parameter. + + :ivar __name__: The name of the task method. + :type __name__: str + :ivar description: An optional human-readable description of the task. + :type description: Optional[str] + """ + __name__: str description: Optional[str] def __call__( self, *args, context: Optional["Context"] = None, **kwargs ): # pragma: no cover - pass + """ + Task method signature. + + :param args: Positional arguments to pass to the task. + :type args: Any + :param context: An optional execution context for the task. + :type context: Optional[Context] + :param kwargs: Keyword arguments to pass to the task. + :type kwargs: Dict[str, Any] + """ TaskMethod: TypeAlias = Union[UndecoratedTaskMethod, DecoratedTaskMethod] +""" +Type alias for any task method. + +Represents either a decorated or undecorated task method. This is the primary +type used when registering methods as tasks with a builder. + +:type: Union[UndecoratedTaskMethod, DecoratedTaskMethod] +""" Description: TypeAlias = Optional[str] +""" +Type alias for an optional task or item description. + +Used to provide human-readable descriptions for tasks, items in iterations, +or other process elements. + +:type: Optional[str] +""" TaskArgs: TypeAlias = Union[Dict[str, Any], Tuple[Dict[str, Any], Description]] +""" +Type alias for task arguments. + +Represents arguments passed to a task, which can be either a dictionary +of arguments or a tuple containing a dictionary of arguments and an +optional description string. + +:type: Union[Dict[str, Any], Tuple[Dict[str, Any], Description]] +""" ForEachReturn: TypeAlias = Iterator[TaskArgs] +""" +Type alias for the return type of the ``each`` method. + +Represents an iterator that yields task arguments. Used in the ``each`` +pattern to iterate over a collection of items, yielding arguments for +each iteration. + +:type: Iterator[TaskArgs] +""" ForEachMethod: TypeAlias = Callable[..., ForEachReturn] +""" +Type alias for the ``each`` method. + +Represents a callable that accepts any arguments and keyword arguments, +and returns an iterator of task arguments. Used to define iteration +methods for the ``each`` pattern. + +:type: Callable[..., ForEachReturn] +""" TransformMethod: TypeAlias = Callable[..., TaskArgs] +""" +Type alias for a transformation method. + +Represents a callable that accepts any arguments and keyword arguments, +and returns task arguments. Used to define transformation methods for +the ``transform`` pattern to modify arguments before task execution. + +:type: Callable[..., TaskArgs] +""" BuilderIterator: TypeAlias = Iterator["Builder"] +""" +Type alias for an iterator of ``Builder`` instances. + +Represents an iterator that yields Builder objects, typically used in +control flow patterns such as sequential, concurrent, or each blocks. + +:type: Iterator[Builder] +""" ProcessDefinitionType: TypeAlias = Type["ProcessDefinition"] +""" +Type alias for a ``ProcessDefinition`` class. + +Represents a class type that inherits from ProcessDefinition, used when +specifying sub-processes or other process references. + +:type: Type[ProcessDefinition] +""" Options: TypeAlias = Union[SimpleNamespace, Dict[str, Any]] +""" +Type alias for process options. + +Represents configuration options that can be passed to a process, either +as a ``SimpleNamespace`` object or as a dictionary. + +:type: Union[SimpleNamespace, Dict[str, Any]] +""" Value = TypeVar("Value") +""" +Generic type variable for generic operations. + +Used in generic function or class definitions where a type parameter +is needed to represent any arbitrary type. + +:type: TypeVar +""" diff --git a/src/tasktronaut/utils.py b/src/tasktronaut/utils.py index d6c9d75..880704d 100644 --- a/src/tasktronaut/utils.py +++ b/src/tasktronaut/utils.py @@ -1,3 +1,7 @@ +""" +Utility and supporting methods. +""" + import importlib from typing import Any, TYPE_CHECKING, cast @@ -7,10 +11,37 @@ def to_dict(**kwargs: Any) -> dict[str, Any]: + """ + Convert keyword arguments to a dictionary. + + This utility function packages keyword arguments into a dictionary, + providing a convenient way to convert variadic keyword arguments + into a dictionary structure. + + :param kwargs: Arbitrary keyword arguments to convert + :type kwargs: Dict[str, Any] + :return: Dictionary containing all passed keyword arguments + :rtype: dict[str, Any] + """ return kwargs def to_kwargs(base: dict[str, Any], **kwargs: Any) -> dict[str, Any]: + """ + Merge a base dictionary with additional keyword arguments. + + Creates a new dictionary by copying the base dictionary and updating + it with the provided keyword arguments. The original base dictionary + is not modified. Keyword arguments take precedence over base dictionary + values in case of key conflicts. + + :param base: Base dictionary to copy and update + :type base: dict[str, Any] + :param kwargs: Keyword arguments to merge into the base dictionary + :type kwargs: Dict[str, Any] + :return: New dictionary containing merged key-value pairs + :rtype: dict[str, Any] + """ d = base.copy() d.update(kwargs) return d @@ -20,6 +51,24 @@ def load_definition( module_name: str, definition_class: str, ) -> "ProcessDefinition": + """ + Dynamically load and instantiate a process definition class. + + This function provides dynamic loading of ProcessDefinition subclasses + from specified modules. It imports the module by name, retrieves the + definition class from that module, and returns a new instance of the class. + + :param module_name: Fully qualified module name to import + :type module_name: str + :param definition_class: Name of the ProcessDefinition class to load from the module + :type definition_class: str + :return: A new instance of the specified ProcessDefinition class + :rtype: ProcessDefinition + + :raises ImportError: If the module cannot be imported + :raises AttributeError: If the definition_class does not exist in the module + :raises TypeError: If the loaded class cannot be instantiated + """ module = importlib.import_module(module_name) definition_type = getattr(module, definition_class) # return new instance of definition type From 611fb9a983db30465eab5752a26373cb25000f4f Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 17:33:53 +0000 Subject: [PATCH 08/15] add security note --- src/tasktronaut/utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/tasktronaut/utils.py b/src/tasktronaut/utils.py index 880704d..61f232f 100644 --- a/src/tasktronaut/utils.py +++ b/src/tasktronaut/utils.py @@ -69,6 +69,12 @@ def load_definition( :raises AttributeError: If the definition_class does not exist in the module :raises TypeError: If the loaded class cannot be instantiated """ + + # SECURITY: make sure you trust the module that gets loaded! + # consider using an allowed list of module names to + # prevent unknown modules (and definitions) from being + # loaded dynamically. + module = importlib.import_module(module_name) definition_type = getattr(module, definition_class) # return new instance of definition type From 77460a2218ca94b06cd30f32fdf69412b8cc7818 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 18:04:38 +0000 Subject: [PATCH 09/15] provide `__version__` attribute --- src/tasktronaut/__init__.py | 8 ++++++++ tests/test_utils.py | 6 +++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/tasktronaut/__init__.py b/src/tasktronaut/__init__.py index 789fefc..21cdba0 100644 --- a/src/tasktronaut/__init__.py +++ b/src/tasktronaut/__init__.py @@ -9,12 +9,19 @@ the library. """ +from importlib.metadata import PackageNotFoundError, version + from .backend import Backend from .builder import Builder from .context import Context from .decorators import task from .process import ExecutionMode, ProcessDefinition +try: + __version__ = version(__name__) +except PackageNotFoundError: + __version__ = "0.0.0.dev" + __all__ = ( "Backend", "Builder", @@ -22,4 +29,5 @@ "ExecutionMode", "ProcessDefinition", "task", + "__version__", ) diff --git a/tests/test_utils.py b/tests/test_utils.py index d0ad434..b42d600 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,14 @@ import unittest -from tasktronaut.utils import load_definition, to_kwargs, to_dict +import tasktronaut +from tasktronaut.utils import load_definition, to_dict, to_kwargs from .examples import SimpleSequential class UtilsTestCase(unittest.TestCase): + def test_version(self): + self.assertNotEqual("0.0.0.dev", tasktronaut.__version__) + def test_to_dict(self): result = to_dict(foo=1) self.assertIsNotNone(result) From 639d95dd722e7c9ebc3014885cffafb17d820743 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 18:04:25 +0000 Subject: [PATCH 10/15] getting started and user guide --- docs/source/getting-started.rst | 96 +++++++++ docs/source/index.rst | 5 +- docs/source/user-guide.rst | 364 ++++++++++++++++++++++++++++++++ docs/source/user_guide.rst | 2 - 4 files changed, 464 insertions(+), 3 deletions(-) create mode 100644 docs/source/getting-started.rst create mode 100644 docs/source/user-guide.rst delete mode 100644 docs/source/user_guide.rst diff --git a/docs/source/getting-started.rst b/docs/source/getting-started.rst new file mode 100644 index 0000000..23ead92 --- /dev/null +++ b/docs/source/getting-started.rst @@ -0,0 +1,96 @@ +Getting Started with Tasktronaut +================================= + +Welcome to Tasktronaut! + +This guide will help you install the library, set up your first process, and run it successfully. + +Installation +------------ + +Prerequisites +~~~~~~~~~~~~~ + +Tasktronaut requires: + +- Python 3.7 or higher +- pip (Python package manager) + +Installing from PyPI +~~~~~~~~~~~~~~~~~~~~ + +The easiest way to install Tasktronaut is using pip: + +.. code-block:: bash + + pip install tasktronaut[rq] + +For a specific version: + +.. code-block:: bash + + pip install tasktronaut[rq]==0.3.0 + +Installing with Optional Dependencies +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Tasktronaut may offer optional extras for extended functionality. Install them as follows: + +.. code-block:: bash + + pip install tasktronaut[dev] + pip install tasktronaut[docs] + +Installing from Source +~~~~~~~~~~~~~~~~~~~~~~ + +To install the development version from the repository: + +.. code-block:: bash + + git clone https://github.com/virtualstaticvoid/tasktronaut.git + cd tasktronaut + uv sync + +Verifying Your Installation +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Verify the installation was successful by importing Tasktronaut in a Python shell: + +.. code-block:: python + + >>> import tasktronaut + >>> print(tasktronaut.__version__) + x.x.x + +If you see the version number without errors, you're ready to go! + + +Your First Process +------------------ + +Creating a Simple Process +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Let's create a basic process with three sequential tasks. Create a file called ``my_first_process.py``: + +.. code-block:: python + + from tasktronaut import ProcessDefinition, Builder + + class GreetingProcess(ProcessDefinition): + """A simple process that greets the user.""" + + def define_process(self, builder: Builder): + builder.task(self.say_hello) + builder.task(self.say_name) + builder.task(self.say_goodbye) + + def say_hello(self): + print("Hello!") + + def say_name(self, name: str = "World", **kwargs): + print(f"Nice to meet you, {name}.") + + def say_goodbye(self): + print("Goodbye!") diff --git a/docs/source/index.rst b/docs/source/index.rst index a582269..fccbe2f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -6,11 +6,14 @@ ``tasktronaut`` Documentation ============================= +Welcome to Tasktronaut! + .. toctree:: :maxdepth: 1 :caption: Contents: - user_guide + getting-started + user-guide advanced api/modules diff --git a/docs/source/user-guide.rst b/docs/source/user-guide.rst new file mode 100644 index 0000000..07a12d9 --- /dev/null +++ b/docs/source/user-guide.rst @@ -0,0 +1,364 @@ +User Guide +========== + +Introduction +------------ + +Tasktronaut is a Python library for defining and executing processes composed of individual tasks. + +It provides a declarative API for organizing tasks into complex workflows with support for sequential +execution, concurrent execution, conditional logic, iteration, and nested sub-processes. + +Getting Started +--------------- + +Basic Process Definition +~~~~~~~~~~~~~~~~~~~~~~~~ + +To create a process, subclass ``ProcessDefinition`` and implement the ``define_process`` method: + +.. code-block:: python + + from tasktronaut import ProcessDefinition, Builder + + class MyProcess(ProcessDefinition): + def define_process(self, builder: Builder): + builder.task(self.step_one) + builder.task(self.step_two) + builder.task(self.step_three) + + def step_one(self): + print("Executing step one") + + def step_two(self): + print("Executing step two") + + def step_three(self): + print("Executing step three") + +The ``builder`` object is used to add tasks and structure your process flow. + + +Tasks +----- + +Defining Tasks +~~~~~~~~~~~~~~ + +Tasks are methods that perform work within a process. There are several ways to define tasks: + +**Simple Methods:** + +Any method can be used as a task: + +.. code-block:: python + + def my_task(self, *args, **kwargs): + # Task implementation + pass + +**Context-Aware Methods:** + +Methods can receive a ``Context`` object to access execution information: + +.. code-block:: python + + from tasktronaut import Context + + def my_task(self, context: Context, **kwargs): + # Access runtime information via context + pass + +**Task Decorator:** + +Use the ``@task`` decorator to add metadata like descriptions: + +.. code-block:: python + + from tasktronaut import task + + @task(description="Processes data input") + def my_task(self, *args, **kwargs): + pass + + @task() + def another_task(self): + pass + + @task + def simple_task(self): + pass + + +Execution Modes +--------------- + +Sequential Execution +~~~~~~~~~~~~~~~~~~~~ + +By default, tasks execute one after another in the order they are defined. Set the execution mode explicitly: + +.. code-block:: python + + from tasktronaut import ExecutionMode, ProcessDefinition + + class SequentialProcess(ProcessDefinition): + process_execution_mode = ExecutionMode.SEQUENTIAL + + def define_process(self, builder: Builder): + builder.task(self.task_a) + builder.task(self.task_b) + builder.task(self.task_c) + +Tasks execute in order: task_a, then task_b, then task_c. + +Concurrent Execution +~~~~~~~~~~~~~~~~~~~~ + +Execute multiple tasks in parallel: + +.. code-block:: python + + class ConcurrentProcess(ProcessDefinition): + process_execution_mode = ExecutionMode.CONCURRENT + + def define_process(self, builder: Builder): + builder.task(self.task_a) + builder.task(self.task_b) + builder.task(self.task_c) + +All three tasks may execute simultaneously. + + +Process Structure +----------------- + +Sequential and Concurrent Blocks +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Group tasks into sequential or concurrent sections within a larger process: + +.. code-block:: python + + class MixedProcess(ProcessDefinition): + def define_process(self, builder: Builder): + builder.task(self.setup) + + # Run these tasks sequentially + with builder.sequential() as b: + b.task(self.step_one) + b.task(self.step_two) + + # Run these tasks concurrently + with builder.concurrent() as b: + b.task(self.parallel_task_a) + b.task(self.parallel_task_b) + + builder.task(self.cleanup) + +This allows fine-grained control over execution flow within a single process. + + +Input Arguments +--------------- + +Defining Expected Arguments +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Specify the input arguments your process accepts with type hints: + +.. code-block:: python + + import datetime + from typing import List, Optional + + class ProcessWithArguments(ProcessDefinition): + def define_process(self, builder: Builder): + builder.expected_arguments( + name=str, + count=int, + date=datetime.date, + tags=List[str], + description=Optional[str], + data=None, # Any type accepted + ) + builder.task(self.process_data) + + def process_data(self, name: str, count: int, **kwargs): + pass + +These arguments are passed to your tasks and can be accessed via keyword arguments or through the Context object. + + +Conditional Logic +----------------- + +Using Options +~~~~~~~~~~~~~ + +Include or exclude tasks based on boolean options: + +.. code-block:: python + + class ConditionalProcess(ProcessDefinition): + def define_process(self, builder: Builder): + if builder.option("enable_validation", False): + builder.task(self.validate_input) + + builder.task(self.process) + + if builder.option("enable_logging", True): + builder.task(self.log_results) + +The first argument is the option name, and the second is the default value. Tasks are only added if the option is True. + + +Iteration +--------- + +The Each Pattern +~~~~~~~~~~~~~~~~ + +Iterate over a collection and execute tasks for each item: + +.. code-block:: python + + from typing import Iterator + + class IterativeProcess(ProcessDefinition): + def items(self, files: List[str], **_) -> Iterator[dict]: + for file in files: + yield {"filename": file} + + def define_process(self, builder: Builder): + for iteration in builder.each(self.items): + iteration.task(self.process_file) + + def process_file(self, filename: str, **kwargs): + print(f"Processing {filename}") + +The ``items`` method is a generator that yields dictionaries of arguments for each iteration. + +**With Descriptions:** + +Optionally provide descriptions for each iteration: + +.. code-block:: python + + class IterativeProcessWithDescriptions(ProcessDefinition): + def items(self, files: List[str], **_) -> Iterator[Tuple[dict, str]]: + for file in files: + yield ({"filename": file}, f"Processing {file}") + + def define_process(self, builder: Builder): + for iteration in builder.each(self.items): + iteration.task(self.process_file) + + +Transformation +-------------- + +The Transform Pattern +~~~~~~~~~~~~~~~~~~~~~ + +Transform input arguments before executing tasks: + +.. code-block:: python + + class TransformingProcess(ProcessDefinition): + def transform_input(self, raw_data: str, **_) -> dict: + return { + "processed_data": raw_data.upper(), + } + + def define_process(self, builder: Builder): + with builder.transform(self.transform_input) as transformed: + transformed.task(self.process) + + def process(self, processed_data: str, **kwargs): + pass + +The transform method receives the input arguments and returns a dictionary of transformed arguments. + +**With Descriptions:** + +Include descriptions for transformed sections: + +.. code-block:: python + + class TransformingProcessWithDescription(ProcessDefinition): + def transform_input(self, item_id: int, **_) -> Tuple[dict, str]: + return ( + {"processed_id": item_id * 2}, + f"Processing item {item_id}", + ) + + def define_process(self, builder: Builder): + with builder.transform(self.transform_input) as transformed: + transformed.task(self.process) + + +Sub-Processes +------------- + +Nesting Processes +~~~~~~~~~~~~~~~~~ + +Include another ``ProcessDefinition`` as a sub-process: + +.. code-block:: python + + class ValidationProcess(ProcessDefinition): + def define_process(self, builder: Builder): + builder.task(self.check_schema) + builder.task(self.check_constraints) + + class MainProcess(ProcessDefinition): + def define_process(self, builder: Builder): + builder.task(self.load_data) + builder.sub_process(ValidationProcess) + builder.task(self.save_data) + +Sub-processes allow you to compose larger workflows from smaller, reusable process definitions. + + +Advanced Patterns +----------------- + +Combining Multiple Patterns +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Complex workflows can combine multiple patterns: + +.. code-block:: python + + class ComplexProcess(ProcessDefinition): + def get_batches(self, items: List[str], **_) -> Iterator[Tuple[dict, str]]: + for i in range(0, len(items), 10): + batch = items[i:i+10] + yield ({"batch": batch}, f"Batch {i//10}") + + def define_process(self, builder: Builder): + builder.task(self.setup) + + for batch_iter in builder.each(self.get_batches): + with batch_iter.transform(self.prepare_batch) as prepared: + with prepared.concurrent() as b: + b.task(self.process_item_a) + b.task(self.process_item_b) + + builder.task(self.finalize) + +This process iterates over batches, transforms them, and then processes items concurrently within each batch. + + +Best Practices +-------------- + +- **Keep tasks focused**: Each task should have a single responsibility. +- **Use descriptions**: Add task descriptions for clarity and debugging. +- **Handle exceptions**: Include error handling within task methods. +- **Leverage context**: Use the Context object to access execution metadata when needed. +- **Organize related tasks**: Use sub-processes to organize large workflows into logical units. +- **Document arguments**: Clearly define expected arguments in ``expected_arguments()``. +- **Use type hints**: Include type hints for better IDE support and documentation. diff --git a/docs/source/user_guide.rst b/docs/source/user_guide.rst deleted file mode 100644 index 62b8c5a..0000000 --- a/docs/source/user_guide.rst +++ /dev/null @@ -1,2 +0,0 @@ -User Guide -========== From be744ece3dcdf27aeda8115b310ad8cf69242f45 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 20:53:27 +0000 Subject: [PATCH 11/15] fix for task functions without `context` parameter --- src/tasktronaut/backend.py | 10 ++++++- tests/mocks.py | 12 +++++++++ tests/test_backend.py | 55 ++++++++++++++++++++++++++++++++------ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/src/tasktronaut/backend.py b/src/tasktronaut/backend.py index 0a894b0..c86264a 100644 --- a/src/tasktronaut/backend.py +++ b/src/tasktronaut/backend.py @@ -2,6 +2,7 @@ Backend module. """ +import inspect import logging from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Protocol, Union @@ -190,6 +191,10 @@ def perform_task( definition = load_definition(module_name, definition_class) task_func = getattr(definition, function_name) + + signature = inspect.signature(task_func) + has_context = "context" in signature.parameters + context = Context() try: with definition.around_task( @@ -198,7 +203,10 @@ def perform_task( description=description, context=context, ): - task_func(context=context, **kwargs) + if has_context: + task_func(context=context, **kwargs) + else: + task_func(**kwargs) except NonRetryableProcessError as e: logger.error( diff --git a/tests/mocks.py b/tests/mocks.py index 924f4c1..2ea569d 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -54,6 +54,18 @@ def define_process(self, builder: Builder): class MockDefinition(ProcessDefinition): + def mock_hook(self, *_, **__): + pass + + def task_with_no_args(self): + self.mock_hook() + + def task_with_args(self, *args, **kwargs): + self.mock_hook(*args, **kwargs) + + def task_with_context(self, *args, context: Context, **kwargs): + self.mock_hook(*args, context=context, **kwargs) + @task def task_success(self, *_, **__): pass diff --git a/tests/test_backend.py b/tests/test_backend.py index 5724496..5db700d 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -19,7 +19,7 @@ def test_perform_start(self, mock_on_started): module_name=self.module_name, definition_class=self.definition_class, ) - self.assertTrue(mock_on_started.was_called()) + self.assertTrue(mock_on_started.called) @mock.patch.object(MockDefinition, "around_task") def test_perform_task_around_task(self, mock_around_task): @@ -32,7 +32,46 @@ def test_perform_task_around_task(self, mock_around_task): description="foo", kwargs={}, ) - self.assertTrue(mock_around_task.was_called()) + self.assertTrue(mock_around_task.called) + + @mock.patch.object(MockDefinition, "mock_hook") + def test_perform_task_with_no_args(self, mock_hook): + self.backend.perform_task( + job=MockJob(), + identifier=self.identifier, + module_name=self.module_name, + definition_class=self.definition_class, + function_name=MockDefinition.task_with_no_args.__name__, + description="foo", + kwargs={}, + ) + self.assertTrue(mock_hook.called) + + @mock.patch.object(MockDefinition, "mock_hook") + def test_perform_task_with_args(self, mock_hook): + self.backend.perform_task( + job=MockJob(), + identifier=self.identifier, + module_name=self.module_name, + definition_class=self.definition_class, + function_name=MockDefinition.task_with_args.__name__, + description="foo", + kwargs={}, + ) + self.assertTrue(mock_hook.called) + + @mock.patch.object(MockDefinition, "mock_hook") + def test_perform_task_with_context(self, mock_hook): + self.backend.perform_task( + job=MockJob(), + identifier=self.identifier, + module_name=self.module_name, + definition_class=self.definition_class, + function_name=MockDefinition.task_with_context.__name__, + description="foo", + kwargs={}, + ) + self.assertTrue(mock_hook.called) @mock.patch.object(MockDefinition, "on_failed") def test_perform_task_on_failed(self, mock_on_failed): @@ -47,7 +86,7 @@ def test_perform_task_on_failed(self, mock_on_failed): description="foo", kwargs={}, ) - self.assertTrue(mock_on_failed.was_called()) + self.assertTrue(mock_on_failed.called) @mock.patch.object(MockDefinition, "on_failed") def test_perform_task_on_failed_no_retry(self, mock_on_failed): @@ -62,7 +101,7 @@ def test_perform_task_on_failed_no_retry(self, mock_on_failed): description="foo", kwargs={}, ) - self.assertTrue(mock_on_failed.was_called()) + self.assertTrue(mock_on_failed.called) @mock.patch.object(MockDefinition, "on_cancelled") @mock.patch.object(MockJob, "cancel") @@ -82,9 +121,9 @@ def test_perform_task_on_cancelled( description="foo", kwargs={}, ) - self.assertTrue(mock_job_delete_dependents.was_called()) - self.assertTrue(mock_job_cancel.was_called()) - self.assertTrue(mock_on_cancelled.was_called()) + self.assertTrue(mock_job_delete_dependents.called) + self.assertTrue(mock_job_cancel.called) + self.assertTrue(mock_on_cancelled.called) @mock.patch.object(MockDefinition, "on_completed") def test_perform_complete(self, mock_on_completed): @@ -93,4 +132,4 @@ def test_perform_complete(self, mock_on_completed): module_name=self.module_name, definition_class=self.definition_class, ) - self.assertTrue(mock_on_completed.was_called()) + self.assertTrue(mock_on_completed.called) From 60e390cfe213747cc8b3ba9388ab5ae692d46423 Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 21:08:37 +0000 Subject: [PATCH 12/15] add supporting `rq-info` command --- Taskfile.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Taskfile.yml b/Taskfile.yml index c4bcfc1..88dc79c 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -101,3 +101,8 @@ tasks: desc: Run Redis Service cmds: - docker run --rm -p 127.0.0.1:6379:6379 docker.io/redis:8 + + rq-info: + desc: Show RQ information + cmds: + - uv run rq info --interval 5 From 31dff23ddcd8a0b4f5c95c0c8ef99ec4e2db01ce Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 21:22:31 +0000 Subject: [PATCH 13/15] add greeting example process --- examples/README.md | 1 + examples/__init__.py | 0 examples/greeting/__init__.py | 0 examples/greeting/example | 3 +++ examples/greeting/greeting_process.py | 25 +++++++++++++++++++++++++ examples/greeting/main.py | 21 +++++++++++++++++++++ examples/greeting/worker | 3 +++ 7 files changed, 53 insertions(+) create mode 100644 examples/README.md create mode 100644 examples/__init__.py create mode 100644 examples/greeting/__init__.py create mode 100755 examples/greeting/example create mode 100644 examples/greeting/greeting_process.py create mode 100755 examples/greeting/main.py create mode 100755 examples/greeting/worker diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..b1aee5c --- /dev/null +++ b/examples/README.md @@ -0,0 +1 @@ +## Example Applications diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/greeting/__init__.py b/examples/greeting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/greeting/example b/examples/greeting/example new file mode 100755 index 0000000..2a9da53 --- /dev/null +++ b/examples/greeting/example @@ -0,0 +1,3 @@ +#!/bin/bash +set -e +uv run main.py diff --git a/examples/greeting/greeting_process.py b/examples/greeting/greeting_process.py new file mode 100644 index 0000000..bf5386a --- /dev/null +++ b/examples/greeting/greeting_process.py @@ -0,0 +1,25 @@ +import logging + +import tasktronaut as ttn + +logger = logging.getLogger(__name__) + + +class GreetingProcess(ttn.ProcessDefinition): + """A simple process that greets the user.""" + + def define_process(self, builder: ttn.Builder): + builder.expected_arguments(name=str) + + builder.task(self.say_hello) + builder.task(self.say_name) + builder.task(self.say_goodbye) + + def say_hello(self, name: str): + logger.info(f"Hello {name}!") + + def say_name(self, name: str): + logger.info(f"Nice to meet you, {name}.") + + def say_goodbye(self, name: str): + logger.info(f"Goodbye {name}!") diff --git a/examples/greeting/main.py b/examples/greeting/main.py new file mode 100755 index 0000000..02919b1 --- /dev/null +++ b/examples/greeting/main.py @@ -0,0 +1,21 @@ +import redis +import rq + +from greeting_process import GreetingProcess +from tasktronaut.backends.rq import RqBackend + + +def main(): + connection = redis.Redis() + queue = rq.Queue(connection=connection) + backend = RqBackend(queue=queue) + + process = GreetingProcess.build(name="Chris") + job = process.enqueue(backend=backend) + + print("Process enqueued.") + print(job) + + +if __name__ == "__main__": + main() diff --git a/examples/greeting/worker b/examples/greeting/worker new file mode 100755 index 0000000..87fc6a0 --- /dev/null +++ b/examples/greeting/worker @@ -0,0 +1,3 @@ +#!/bin/bash +set -e +uv run rq worker high default low From 57c411dd8524edc2945326713b438ff677ad133c Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 21:25:12 +0000 Subject: [PATCH 14/15] ensure `uv.lock` is up-to-date --- Taskfile.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/Taskfile.yml b/Taskfile.yml index 88dc79c..2cb1dcf 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -75,6 +75,7 @@ tasks: prompt: Bump Python package version from {{.CURRENT}} to {{.NEXT}}? cmds: - uv version --bump minor + - uv sync - git add pyproject.toml uv.lock - git commit -m "bump version" From 037499315e3d2e4d99b4bd17099607420c0cd37d Mon Sep 17 00:00:00 2001 From: Chris Stefano Date: Wed, 12 Nov 2025 21:25:31 +0000 Subject: [PATCH 15/15] bump version --- pyproject.toml | 2 +- uv.lock | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9fd6043..b8f0088 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "tasktronaut" -version = "0.2.0" +version = "0.3.0" description = "A simple task orchestration library for running complex processes or workflows in Python." readme = "README.md" requires-python = ">=3.10" diff --git a/uv.lock b/uv.lock index 5110044..d6675e3 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.12'", @@ -1627,7 +1627,7 @@ wheels = [ [[package]] name = "tasktronaut" -version = "0.2.0" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "pydantic" },