From b5fbeb70152af1708f58e98288b05d74a75040ac Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 20 Jan 2026 12:13:01 +0100 Subject: [PATCH 1/3] Support python 3.14 --- .github/workflows/ci.yaml | 4 ++-- pyproject.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b969006..19c43ec 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -10,10 +10,10 @@ jobs: fail-fast: true matrix: os: ["ubuntu-latest"] - python-version: ["3.11", "3.12", "3.13"] + python-version: ["3.12", "3.13", "3.14"] experimental: [false] include: - - python-version: "3.13" + - python-version: "3.14" os: "ubuntu-latest" experimental: true diff --git a/pyproject.toml b/pyproject.toml index b563201..f647bdb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ description = "Pytroll workflow execution framework" readme = "README.md" license = "GPL-3.0-or-later" license-files = ["LICENSE"] -requires-python = ">=3.11" +requires-python = ">=3.12" authors = [ { name = "Adam Dybbroe" }, { name = "Gerrit Holl" }, From 0cade423c42bb057d0543eff7ebfed4c4d13349d Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 20 Jan 2026 12:22:13 +0100 Subject: [PATCH 2/3] Fix python 3.14 compatibility --- trollflow2/__init__.py | 7 ++- trollflow2/launcher.py | 4 +- trollflow2/logging.py | 11 +++-- trollflow2/tests/test_launcher.py | 75 +++++++++++++++++-------------- 4 files changed, 57 insertions(+), 40 deletions(-) diff --git a/trollflow2/__init__.py b/trollflow2/__init__.py index f12076b..58e4495 100644 --- a/trollflow2/__init__.py +++ b/trollflow2/__init__.py @@ -25,9 +25,14 @@ # are not necessary """Base module for trollflow2.""" +from functools import cache from importlib.metadata import version from multiprocessing import Manager -MP_MANAGER = Manager() + +@cache +def get_manager(): + """Create a singleton Manager.""" + return Manager() __version__ = version(__name__) diff --git a/trollflow2/launcher.py b/trollflow2/launcher.py index 7d70f51..2dcb8bc 100644 --- a/trollflow2/launcher.py +++ b/trollflow2/launcher.py @@ -49,7 +49,7 @@ except ImportError: ListenerContainer = None -from trollflow2 import MP_MANAGER +from trollflow2 import get_manager from trollflow2.dict_tools import gen_dict_extract, plist_iter from trollflow2.logging import (create_logged_process, logging_on, queued_logging) @@ -237,7 +237,7 @@ def _run_subprocess(self, messages): def _run_product_list_on_messages(self, messages, target_fun, process_creator): """Run the product list on the messages.""" for msg in messages: - produced_files_queue = MP_MANAGER.Queue() + produced_files_queue = get_manager().Queue() kwargs = dict(produced_files=produced_files_queue, prod_list=self.product_list) proc = process_creator(target=target_fun, args=(msg,), kwargs=kwargs) start_time = datetime.now() diff --git a/trollflow2/logging.py b/trollflow2/logging.py index af72edb..7be8c2e 100644 --- a/trollflow2/logging.py +++ b/trollflow2/logging.py @@ -28,7 +28,7 @@ from logging import getLogger from logging.handlers import QueueHandler, QueueListener -from trollflow2 import MP_MANAGER +from trollflow2 import get_manager DEFAULT_LOG_CONFIG = {'version': 1, 'disable_existing_loggers': False, @@ -38,7 +38,10 @@ 'formatter': 'pytroll'}}, 'root': {'level': 'DEBUG', 'handlers': ['console']}} -LOG_QUEUE = MP_MANAGER.Queue() +@functools.cache +def get_log_queue(): + """Lazily create the shared logging queue.""" + return get_manager().Queue() LOG_CONFIG = None @@ -59,7 +62,7 @@ def logging_on(config=None): with configure_logging(config): root.handlers.extend(handlers) # set up and run listener - listener = QueueListener(LOG_QUEUE, *(root.handlers)) + listener = QueueListener(get_log_queue(), *(root.handlers)) listener.start() try: yield @@ -152,7 +155,7 @@ def create_logged_process(target, args, kwargs=None): from multiprocessing import get_context if kwargs is None: kwargs = {} - kwargs["log_queue"] = LOG_QUEUE + kwargs["log_queue"] = get_log_queue() kwargs["log_config"] = LOG_CONFIG ctx = get_context('spawn') proc = ctx.Process(target=target, args=args, kwargs=kwargs) diff --git a/trollflow2/tests/test_launcher.py b/trollflow2/tests/test_launcher.py index 5273da7..6495f17 100644 --- a/trollflow2/tests/test_launcher.py +++ b/trollflow2/tests/test_launcher.py @@ -970,60 +970,69 @@ def test_sigterm_generate_messages(tmp_path): import signal from multiprocessing import Process - from trollflow2.launcher import generate_messages ipc_path = os.fspath(tmp_path / "my_pipe") connection_parameters = {"nameserver": False, "addresses": f"ipc://{ipc_path}", "topic": "/test"} - proc = Process(target=generate_messages, args=(connection_parameters, )) + + proc = Process(target=run_generate_messages, args=(connection_parameters, )) proc.start() # Wait for the message listening loop to start time.sleep(1) + assert proc.is_alive() # Send SIGTERM os.kill(proc.pid, signal.SIGTERM) proc.join() - + assert not proc.is_alive() assert proc.exitcode == 0 +def run_generate_messages(conn): + from trollflow2.launcher import generate_messages + for _ in generate_messages(conn): + pass + + def _fake_queue_logged_process(msg, prod_list, produced_files, **kwargs): time.sleep(5.0) -@mock.patch("trollflow2.launcher.ListenerContainer") -@mock.patch("trollflow2.launcher.queue_logged_process", - new=_fake_queue_logged_process) -def test_sigterm_runner(lc_, tmp_path): - """Test that sending sigterm to Trollflow2 stops it.""" - import os - import signal - from multiprocessing import Process - +def test_runner_executes_worker_successfully(tmp_path): + """Test that Runner can successfully spawn a worker process.""" from posttroll.message import Message from trollflow2.launcher import Runner + proof_file = tmp_path / "proof.txt" + yaml_config = f""" + proof_file: {str(proof_file)} + product_list: + areas: + test_area: + products: + test_product: + hello: world + + workers: + - fun: !!python/name:trollflow2.tests.test_launcher.touch_worker + """ + + config_file = tmp_path / "trollflow2.yaml" + with open(config_file, "w") as f: + f.write(yaml_config) + msg = Message("/my/topic", atype="file", data={"filename": "foo"}) - listener = mock.MagicMock() - listener.output_queue.get.return_value = msg - lc_.return_value = listener + message_file = tmp_path / "message.txt" + with open(message_file, "w") as f: + f.write(str(msg)) - product_list = tmp_path / "trollflow2.yaml" - with open(product_list, "w") as fid: - fid.write(yaml_test1) + runner = Runner(config_file, {}, test_message=str(message_file)) + runner.run() - connection_parameters = {} - runner = Runner(product_list, connection_parameters) + assert proof_file.exists(), "The worker process did not create the proof file." + with open(proof_file) as f: + assert "I ran successfully" in f.read() - proc = Process(target=runner.run) - proc.start() - tic = time.time() - # Wait for the message listening loop to start - time.sleep(1) - # Send SIGTERM - os.kill(proc.pid, signal.SIGTERM) - proc.join() - assert proc.exitcode == 0 - # The fake processing takes 5 seconds, so it should be at least - # this long until the process is terminated - elapsed_time = time.time() - tic - assert elapsed_time > 5.0 +def touch_worker(job, **kwargs): + """A worker that proves it ran by creating a file.""" + with open(job["product_list"]["proof_file"], "w") as fd: + fd.write("I ran successfully inside a process!") From fd65f831fb0b16adb816bc83d6c7983de5e84e6c Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Tue, 20 Jan 2026 12:38:12 +0100 Subject: [PATCH 3/3] Wait longer --- trollflow2/tests/test_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trollflow2/tests/test_launcher.py b/trollflow2/tests/test_launcher.py index 6495f17..05b100e 100644 --- a/trollflow2/tests/test_launcher.py +++ b/trollflow2/tests/test_launcher.py @@ -976,7 +976,7 @@ def test_sigterm_generate_messages(tmp_path): proc = Process(target=run_generate_messages, args=(connection_parameters, )) proc.start() # Wait for the message listening loop to start - time.sleep(1) + time.sleep(5) assert proc.is_alive() # Send SIGTERM os.kill(proc.pid, signal.SIGTERM)