Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ jobs:
fail-fast: true
matrix:
os: ["ubuntu-latest"]
python-version: ["3.11", "3.12", "3.13"]
python-version: ["3.12", "3.13", "3.14"]
experimental: [false]
include:
- python-version: "3.13"
- python-version: "3.14"
os: "ubuntu-latest"
experimental: true

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Pytroll workflow execution framework"
readme = "README.md"
license = "GPL-3.0-or-later"
license-files = ["LICENSE"]
requires-python = ">=3.11"
requires-python = ">=3.12"
authors = [
{ name = "Adam Dybbroe" },
{ name = "Gerrit Holl" },
Expand Down
7 changes: 6 additions & 1 deletion trollflow2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
# are not necessary
"""Base module for trollflow2."""

from functools import cache
from importlib.metadata import version
from multiprocessing import Manager

MP_MANAGER = Manager()

@cache
def get_manager():
"""Create a singleton Manager."""
return Manager()

__version__ = version(__name__)
4 changes: 2 additions & 2 deletions trollflow2/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
except ImportError:
ListenerContainer = None

from trollflow2 import MP_MANAGER
from trollflow2 import get_manager
from trollflow2.dict_tools import gen_dict_extract, plist_iter
from trollflow2.logging import (create_logged_process, logging_on,
queued_logging)
Expand Down Expand Up @@ -237,7 +237,7 @@ def _run_subprocess(self, messages):
def _run_product_list_on_messages(self, messages, target_fun, process_creator):
"""Run the product list on the messages."""
for msg in messages:
produced_files_queue = MP_MANAGER.Queue()
produced_files_queue = get_manager().Queue()
kwargs = dict(produced_files=produced_files_queue, prod_list=self.product_list)
proc = process_creator(target=target_fun, args=(msg,), kwargs=kwargs)
start_time = datetime.now()
Expand Down
11 changes: 7 additions & 4 deletions trollflow2/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from logging import getLogger
from logging.handlers import QueueHandler, QueueListener

from trollflow2 import MP_MANAGER
from trollflow2 import get_manager

DEFAULT_LOG_CONFIG = {'version': 1,
'disable_existing_loggers': False,
Expand All @@ -38,7 +38,10 @@
'formatter': 'pytroll'}},
'root': {'level': 'DEBUG', 'handlers': ['console']}}

LOG_QUEUE = MP_MANAGER.Queue()
@functools.cache
def get_log_queue():
"""Lazily create the shared logging queue."""
return get_manager().Queue()

LOG_CONFIG = None

Expand All @@ -59,7 +62,7 @@ def logging_on(config=None):
with configure_logging(config):
root.handlers.extend(handlers)
# set up and run listener
listener = QueueListener(LOG_QUEUE, *(root.handlers))
listener = QueueListener(get_log_queue(), *(root.handlers))
listener.start()
try:
yield
Expand Down Expand Up @@ -152,7 +155,7 @@ def create_logged_process(target, args, kwargs=None):
from multiprocessing import get_context
if kwargs is None:
kwargs = {}
kwargs["log_queue"] = LOG_QUEUE
kwargs["log_queue"] = get_log_queue()
kwargs["log_config"] = LOG_CONFIG
ctx = get_context('spawn')
proc = ctx.Process(target=target, args=args, kwargs=kwargs)
Expand Down
77 changes: 43 additions & 34 deletions trollflow2/tests/test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,60 +970,69 @@ def test_sigterm_generate_messages(tmp_path):
import signal
from multiprocessing import Process

from trollflow2.launcher import generate_messages
ipc_path = os.fspath(tmp_path / "my_pipe")
connection_parameters = {"nameserver": False, "addresses": f"ipc://{ipc_path}", "topic": "/test"}
proc = Process(target=generate_messages, args=(connection_parameters, ))

proc = Process(target=run_generate_messages, args=(connection_parameters, ))
proc.start()
# Wait for the message listening loop to start
time.sleep(1)
time.sleep(5)
assert proc.is_alive()
# Send SIGTERM
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert not proc.is_alive()
assert proc.exitcode == 0


def run_generate_messages(conn):
from trollflow2.launcher import generate_messages
for _ in generate_messages(conn):
pass


def _fake_queue_logged_process(msg, prod_list, produced_files, **kwargs):
time.sleep(5.0)


@mock.patch("trollflow2.launcher.ListenerContainer")
@mock.patch("trollflow2.launcher.queue_logged_process",
new=_fake_queue_logged_process)
def test_sigterm_runner(lc_, tmp_path):
"""Test that sending sigterm to Trollflow2 stops it."""
import os
import signal
from multiprocessing import Process

def test_runner_executes_worker_successfully(tmp_path):
"""Test that Runner can successfully spawn a worker process."""
from posttroll.message import Message

from trollflow2.launcher import Runner

proof_file = tmp_path / "proof.txt"
yaml_config = f"""
proof_file: {str(proof_file)}
product_list:
areas:
test_area:
products:
test_product:
hello: world

workers:
- fun: !!python/name:trollflow2.tests.test_launcher.touch_worker
"""

config_file = tmp_path / "trollflow2.yaml"
with open(config_file, "w") as f:
f.write(yaml_config)

msg = Message("/my/topic", atype="file", data={"filename": "foo"})
listener = mock.MagicMock()
listener.output_queue.get.return_value = msg
lc_.return_value = listener
message_file = tmp_path / "message.txt"
with open(message_file, "w") as f:
f.write(str(msg))

product_list = tmp_path / "trollflow2.yaml"
with open(product_list, "w") as fid:
fid.write(yaml_test1)
runner = Runner(config_file, {}, test_message=str(message_file))
runner.run()

connection_parameters = {}
runner = Runner(product_list, connection_parameters)
assert proof_file.exists(), "The worker process did not create the proof file."
with open(proof_file) as f:
assert "I ran successfully" in f.read()

proc = Process(target=runner.run)
proc.start()
tic = time.time()
# Wait for the message listening loop to start
time.sleep(1)
# Send SIGTERM
os.kill(proc.pid, signal.SIGTERM)
proc.join()

assert proc.exitcode == 0
# The fake processing takes 5 seconds, so it should be at least
# this long until the process is terminated
elapsed_time = time.time() - tic
assert elapsed_time > 5.0
def touch_worker(job, **kwargs):
"""A worker that proves it ran by creating a file."""
with open(job["product_list"]["proof_file"], "w") as fd:
fd.write("I ran successfully inside a process!")
Loading