diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4879102..26965d2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,6 +1,8 @@ name: CI -on: [push, pull_request] +on: + - push + - pull_request jobs: test: @@ -9,11 +11,11 @@ jobs: strategy: fail-fast: true matrix: - os: ["ubuntu-latest", "macos-latest"] - python-version: ["3.8", "3.9", "3.10"] + os: ["ubuntu-latest"] + python-version: ["3.10", "3.11", "3.12"] experimental: [false] include: - - python-version: "3.9" + - python-version: "3.12" os: "ubuntu-latest" experimental: true @@ -25,10 +27,10 @@ jobs: steps: - name: Checkout source - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Setup Conda Environment - uses: conda-incubator/setup-miniconda@v2 + uses: conda-incubator/setup-miniconda@v3 with: miniconda-version: "latest" python-version: ${{ matrix.python-version }} @@ -69,7 +71,7 @@ jobs: pytest --cov=cspp_runner cspp_runner/tests --cov-report=xml - name: Upload unittest coverage to Codecov - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v5 with: flags: unittests file: ./coverage.xml diff --git a/bin/atms_dr_runner.py b/bin/atms_dr_runner.py new file mode 100644 index 0000000..86d82cb --- /dev/null +++ b/bin/atms_dr_runner.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2022, 2023 Pytroll developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Level-1 processing for Direct Readout S-NPP/JPSS ATMS data. + +Using the CSPP level-1 processor from the SSEC, Wisconsin, based on the ADL +software from NASA. Listen for pytroll messages of ready RDR files trigger +processing on direct readout RDR data (granules or full swaths). + +""" + +import argparse +import logging +import os +import sys +import logging.handlers + +from cspp_runner.atms_rdr2sdr_runner import AtmsSdrRunner +from cspp_runner.logger import setup_logging + +CSPP_SDR_HOME = os.environ.get("CSPP_SDR_HOME", '') + +#: Default time format +_DEFAULT_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' + +#: Default log format +_DEFAULT_LOG_FORMAT = '[%(levelname)s: %(asctime)s : %(name)s] %(message)s' + + +LOG = logging.getLogger(__name__) + + +def get_parser(): + """Get parser for commandline-arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config-file", + required=True, + dest="config_file", + type=str, + default=None, + help="The file containing configuration parameters.") + parser.add_argument("-l", "--log-config", + help="Log config file to use instead of the standard logging.") + parser.add_argument("-v", "--verbose", dest="verbosity", action="count", default=0, + help="Verbosity (between 1 and 2 occurrences with more leading to more " + "verbose logging). WARN=0, INFO=1, " + "DEBUG=2. This is overridden by the log config file if specified.") + return parser + + +def parse_args(): + """Parse command-line arguments.""" + parser = get_parser() + return parser.parse_args() + + +def main(): + """Start the CSPP ATMS runner.""" + cmd_args = parse_args() + print("Read config from", cmd_args.config_file) + + setup_logging(cmd_args) + + try: + atms = AtmsSdrRunner(cmd_args.config_file) + except Exception as err: + LOG.error('ATMS RDR to SDR processing crashed: %s', str(err)) + sys.exit(1) + try: + atms.start() + atms.join() + except KeyboardInterrupt: + LOG.debug("Interrupting") + finally: + atms.close() + + +if __name__ == "__main__": + main() diff --git a/cspp_runner/__init__.py b/cspp_runner/__init__.py index e0ea2f9..cedd9d0 100644 --- a/cspp_runner/__init__.py +++ b/cspp_runner/__init__.py @@ -1,12 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright (c) 2014 - 2020 Pytroll - -# Author(s): - -# Adam.Dybbroe -# Lars Ørum Rasmussen +# Copyright (c) 2014 - 2025 Pytroll developers # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -27,12 +22,15 @@ from datetime import datetime, timedelta import re import logging -from pkg_resources import get_distribution, DistributionNotFound + try: - __version__ = get_distribution(__name__).version -except DistributionNotFound: - # package is not installed - pass + from cspp_runner.version import version as __version__ # noqa +except ModuleNotFoundError: + raise ModuleNotFoundError( + "No module named cspp_runner.version. This could mean " + "you didn't install 'cspp_runner' properly. Try reinstalling ('pip " + "install cspp_runner').") + LOG = logging.getLogger(__name__) @@ -41,18 +39,22 @@ class NPPStamp(object): + """A data structure for a NPP/JPSS VIIRS SDR file stamp. - """ A NPP stamp is: + A NPP stamp is: _d_t_e_b + """ def __init__(self, platform, start_time, end_time, orbit_number): + """Initialize the class.""" self.platform = platform self.start_time = start_time self.end_time = end_time self.orbit_number = orbit_number def __str__(self): + """Documentation.""" date = self.start_time.strftime('%Y%m%d') start = (self.start_time.strftime('%H%M%S') + str(self.start_time.microsecond / 100000)[0]) @@ -63,7 +65,8 @@ def __str__(self): def get_npp_stamp(filename): - """A unique stamp for a granule. + """Get a unique stamp for a granule. + _d_t_e_b """ match = _RE_NPP_STAMP.match(os.path.basename(filename)) @@ -108,7 +111,8 @@ def get_sdr_times(filename): def is_same_granule(filename1, filename2, sec_tolerance): - """ + """Check if an SDR file is a granule. + Take two SDR/RDR files and check their observation time from the filename and determine if they belong to the same granule. Small deviations can happen between RDR files and corresponding SDR files. @@ -118,7 +122,6 @@ def is_same_granule(filename1, filename2, sec_tolerance): 'SVM11_npp_d20180121_t0903382_e0905024_b32305_c20180121091145126446_cspp_dev.h5' """ - t1_ = get_datetime_from_filename(filename1) t2_ = get_datetime_from_filename(filename2) delta_t = abs(t1_ - t2_) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py new file mode 100644 index 0000000..516cb96 --- /dev/null +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Pytroll developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Run the ATMS RDR to SDR processing withy CSPP on incoming DR data.""" + + +import socket +from trollsift import Parser, globify +import os +import shutil + +import pathlib +import tempfile +from glob import glob +from datetime import datetime + +import subprocess +from urllib.parse import urlparse +import logging +import time + +import signal +from queue import Empty +from threading import Thread +from posttroll.listener import ListenerContainer +from posttroll.message import Message +from posttroll.publisher import NoisyPublisher +from cspp_runner.config import read_config +from cspp_runner.runner import check_environment +from cspp_runner.constants import (PLATFORM_SHORTNAMES, + PLATFORM_LONGNAMES) + + +logger = logging.getLogger(__name__) + + +class AtmsSdrRunner(Thread): + """The ATMS CSPP runner to process RDR to SDR files.""" + + def __init__(self, configfile): + """Initialize the runner object.""" + super().__init__() + + self.configfile = configfile + self.options = {} + config = read_config(self.configfile) + self.options = config + + self.host = socket.gethostname() + + self.sdr_file_patterns = self.options['sdr_file_patterns'] + self._sdr_home = self.options['level1_home'] + + self.input_topics = self.options['subscribe_topics'] + self.output_topics = self.options['publish_topics'] + + self._atms_sdr_call = self.options['atms_sdr_call'] + self._atms_sdr_options = self.options['atms_sdr_options'] + + self.listener = None + self.publisher = None + self.loop = False + self._setup_and_start_communication() + + def _setup_and_start_communication(self): + """Set up the Posttroll communication and start the publisher.""" + logger.info("Starting up ATMS DR Runner") + logger.debug("Input topics:") + for top in self.input_topics: + logger.debug("{topic}".format(topic=str(top))) + + self.listener = ListenerContainer(topics=self.input_topics) + self.publisher = NoisyPublisher("atms-rdr2sdr-runner") + self.publisher.start() + self.loop = True + signal.signal(signal.SIGTERM, self.signal_shutdown) + + def signal_shutdown(self, *args, **kwargs): + """Shutdown the ATMS rdr2sdr runner.""" + self.close() + + def run(self): + """Run the CSPP ATMS RDR2SDR processing.""" + while self.loop: + try: + msg = self.listener.output_queue.get(timeout=1) + logger.debug("Message: %s", str(msg.data)) + except Empty: + continue + else: + if msg.type not in ['file', 'collection', 'dataset']: + logger.debug("Message type not supported: %s", str(msg.type)) + continue + + wrkdir = run_atms_from_message(msg, self._atms_sdr_call, self._atms_sdr_options) + + logger.info("ATMS RDR to SDR processing finished") + logger.debug("Start packing the files and publish") + + sdr_filepaths = get_filepaths(wrkdir, msg.data, self.sdr_file_patterns) + logger.debug("Files: %s", str(sdr_filepaths)) + if len(sdr_filepaths) == 0: + logger.warning("No ATMS files - cspp processing failed! " + + "No files to move. Continue.") + continue + + dest_sdr_files = move_files_to_destination(sdr_filepaths, + self.sdr_file_patterns, self._sdr_home) + logger.debug("Files after having been moved: %s", str(dest_sdr_files)) + + orbit_number = _fix_orbit_number(dest_sdr_files, self.sdr_file_patterns) + output_messages = self._get_output_messages(dest_sdr_files, msg, orbit_number) + + for output_msg in output_messages: + if output_msg: + logger.debug("Sending message: %s", str(output_msg)) + self.publisher.send(str(output_msg)) + + def _get_output_messages(self, sdr_files, input_msg, orbit_number): + """Generate output messages from SDR files and input message, and return.""" + out_messages = [] + for topic in self.output_topics: + to_send = prepare_posttroll_message(input_msg) + dataset = [] + for filepath in sdr_files: + sdrfile = {} + sdrfile['uri'] = '{path}'.format(path=filepath) + sdrfile['uid'] = os.path.basename(filepath) + dataset.append(sdrfile) + + to_send['type'] = 'HDF5' + to_send['format'] = 'SDR' + to_send['data_processing_level'] = '1B' + to_send['dataset'] = dataset + to_send['orig_orbit_number'] = to_send.get('orbit_number') + to_send['orbit_number'] = orbit_number + + pubmsg = Message(topic, 'dataset', to_send) + out_messages.append(pubmsg) + + return out_messages + + def close(self): + """Shutdown the ATMS SDR processing.""" + logger.info('Terminating ATMS RDR to SDR processing.') + self.loop = False + try: + self.listener.stop() + except Exception: + logger.exception("Couldn't stop listener.") + if self.publisher: + try: + self.publisher.stop() + except Exception: + logger.exception("Couldn't stop publisher.") + + +def _fix_orbit_number(sdr_files, sdr_file_patterns): + """Get the orbit number from the SDR files produced with CSPP.""" + s_pattern = get_tb_files_pattern(sdr_file_patterns) + + p__ = Parser(s_pattern) + orbit_numbers = [] + for filename in sdr_files: + bname = os.path.basename(str(filename)) + logger.debug("SDR filename: %s", str(bname)) + try: + result = p__.parse(bname) + except ValueError: + continue + + orbit = result.get('orbit', 0) + logger.debug("Orbit number = %s", orbit) + orbit_numbers.append(orbit) + + # Test if there are more orbit numbers and at least log an info. + # FIXME! + return orbit_numbers[0] + + +def prepare_posttroll_message(input_msg): + """Create the basic posttroll-message fields and return.""" + to_send = input_msg.data.copy() + to_send.pop('dataset', None) + to_send.pop('collection', None) + to_send.pop('uri', None) + to_send.pop('uid', None) + to_send.pop('format', None) + to_send.pop('type', None) + + return to_send + + +def move_files_to_destination(sdr_filepaths, sdr_file_patterns, sdr_home): + """Move the SDR files from tmp-directory to a final destination.""" + dirpath = create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home) + for filename in sdr_filepaths: + shutil.move(filename, dirpath) + + return glob(str(dirpath / "*")) + + +def get_tb_files_pattern(sdr_file_patterns): + """Get the file name pattern for the TB SDR files (SATMS).""" + for pattern in sdr_file_patterns: + if pattern.startswith('S'): + return pattern + + +def create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home): + """From the list of SDR files create a sub-directory where files should be moved.""" + s_pattern = get_tb_files_pattern(sdr_file_patterns) + + start_time = datetime.now() + p__ = Parser(s_pattern) + + orbit = 0 + platform = 'unknown' + result = {} + for filename in sdr_filepaths: + bname = os.path.basename(str(filename)) + logger.debug("SDR filename: %s", str(bname)) + try: + result = p__.parse(bname) + except ValueError: + continue + + stime = result.get('start_time') + if stime and stime < start_time: + start_time = stime + + orbit = result.get('orbit', 0) + platform = PLATFORM_LONGNAMES.get(result.get('platform_shortname', 'unknown')) + + subdirname = "{platform}_{dtime:%Y%m%d_%H%M}_{orbit:05d}".format(platform=platform.lower().replace('-', ''), + dtime=start_time, orbit=orbit) + if isinstance(sdr_home, str): + sdr_home = pathlib.Path(sdr_home) + + dirpath = sdr_home / subdirname + dirpath.mkdir() + return dirpath + + +def get_filepaths(directory, msg_data, file_patterns): + """Identify the ATMS files output from CSPP and return filepaths.""" + files = [] + for pattern in file_patterns: + # p__ = Parser(pattern) + # mda = {'orbit': msg_data['orbit_number'], + # 'platform_shortname': PLATFORM_SHORTNAMES.get(msg_data['platform_name'])} + mda = {'platform_shortname': PLATFORM_SHORTNAMES.get(msg_data['platform_name'])} + + # 'start_time': msg_data['start_time']} Here check for times, if + # start/end times in the file names are sufficiently close to the + # actual ones from the messages + + glbstr = globify(pattern, mda) + logger.debug("Glob-string = %s", str(glbstr)) + logger.debug("Directory = %s", str(directory)) + flist = glob(os.path.join(directory, glbstr)) + files = files + flist + + logger.debug("Files: %s", str(files)) + return files + + +def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): + """Trigger ATMS processing on ATMS scene, from Posttroll message.""" + # platform_name = posttroll_msg.data.get('platform_name') + # sensor = posttroll_msg.data.get('sensor') + collection = posttroll_msg.data.get('collection') + if collection: + atms_rdr_files = get_filelist_from_collection(collection) + else: + logger.warning("ATMS processing so far only supports running on collection of files.") + return + + # Process ATMS files in a sub process: + check_environment("CSPP_WORKDIR") + cspp_workdir = os.environ.get("CSPP_WORKDIR", '') + pathlib.Path(cspp_workdir).mkdir(parents=True, exist_ok=True) + + try: + working_dir = tempfile.mkdtemp(dir=cspp_workdir) + except OSError: + working_dir = tempfile.mkdtemp() + + os.environ["CSPP_WORKDIR"] = working_dir + + my_env = os.environ.copy() + + # Run the command: + cmdlist = [sdr_call] + cmdlist.extend(sdr_options) + cmdlist.extend(atms_rdr_files) + + t0_clock = time.process_time() + t0_wall = time.time() + logger.debug("Popen call arguments: " + str(cmdlist)) + + sdr_proc = subprocess.Popen(cmdlist, cwd=working_dir, + env=my_env, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE) + while True: + line = sdr_proc.stdout.readline() + if not line: + break + logger.debug(line.decode("utf-8").strip('\n')) + + while True: + errline = sdr_proc.stderr.readline() + if not errline: + break + logger.debug(errline.decode("utf-8").strip('\n')) + + logger.info("Seconds process time: " + (str(time.process_time() - t0_clock))) + logger.info("Seconds wall clock time: " + (str(time.time() - t0_wall))) + + sdr_proc.poll() + + return working_dir + + +def get_filelist_from_collection(atms_collection): + """From a posttroll message extract the ATMS files from collection.""" + filelist = [] + for obj in atms_collection: + urlobj = urlparse(obj['uri']) + filelist.append(urlobj.path) + + return filelist diff --git a/cspp_runner/version.py b/cspp_runner/config.py similarity index 65% rename from cspp_runner/version.py rename to cspp_runner/config.py index fd36621..1e8aff4 100644 --- a/cspp_runner/version.py +++ b/cspp_runner/config.py @@ -1,11 +1,11 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright (c) 2013, 2014, 2015, 2016 Martin Raspaud +# Copyright (c) 2023 Pytroll Developers # Author(s): -# Martin Raspaud +# Adam Dybbroe # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -20,7 +20,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Version file. -""" +"""Handling the yaml configurations.""" -__version__ = "0.0.1" +import yaml +from yaml import UnsafeLoader + + +def read_config(config_filepath): + """Read and extract config information.""" + with open(config_filepath, 'r') as fp_: + config = yaml.load(fp_, Loader=UnsafeLoader) + + return config diff --git a/cspp_runner/constants.py b/cspp_runner/constants.py new file mode 100644 index 0000000..f1388e6 --- /dev/null +++ b/cspp_runner/constants.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Adam.Dybbroe + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Constants and utilities for name conversions etc.""" + + +PLATFORM_SHORTNAMES = {'NOAA-20': 'j01', + 'NOAA-21': 'j02', + 'Suomi-NPP': 'npp'} +PLATFORM_LONGNAMES = {'j01': 'NOAA-20', + 'j02': 'NOAA-21', + 'npp': 'Suomi-NPP'} diff --git a/cspp_runner/logger.py b/cspp_runner/logger.py new file mode 100644 index 0000000..ff520ab --- /dev/null +++ b/cspp_runner/logger.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Pytroll developers + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""The log handling.""" + +import logging +import logging.config +import logging.handlers +import yaml + +LOG_FORMAT = "[%(asctime)s %(levelname)-8s] %(message)s" + +log_levels = { + 0: logging.WARN, + 1: logging.INFO, + 2: logging.DEBUG, +} + + +def setup_logging(cmd_args): + """Set up logging.""" + if cmd_args.log_config is not None: + with open(cmd_args.log_config) as fd: + log_dict = yaml.safe_load(fd.read()) + logging.config.dictConfig(log_dict) + return + + root = logging.getLogger('') + root.setLevel(log_levels[cmd_args.verbosity]) + + fh_ = logging.StreamHandler() + + formatter = logging.Formatter(LOG_FORMAT) + fh_.setFormatter(formatter) + + root.addHandler(fh_) diff --git a/cspp_runner/orbitno.py b/cspp_runner/orbitno.py index af4a875..30a66f6 100644 --- a/cspp_runner/orbitno.py +++ b/cspp_runner/orbitno.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (c) 2013 - 2023 Pytroll Developers +# Copyright (c) 2013 - 2023, 2025 Pytroll Developers # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by diff --git a/cspp_runner/post_cspp.py b/cspp_runner/post_cspp.py index e00f267..0392d92 100644 --- a/cspp_runner/post_cspp.py +++ b/cspp_runner/post_cspp.py @@ -1,6 +1,11 @@ -"""Scanning the CSPP working directory and cleaning up after CSPP processing -and move the SDR granules to a destination directory""" +"""Scanning the CSPP working directory and cleaning up after processing. +Scanning the CSPP working directory and cleaning up after CSPP processing +and move the SDR granules to a destination directory. +""" + + +import fnmatch import os import pathlib import stat @@ -13,8 +18,9 @@ TLE_SATNAME = {'npp': 'SUOMI NPP', 'j01': 'NOAA-20', + 'j02': 'NOAA-21', 'noaa20': 'NOAA-20', - 'noaa21': 'NOAA-20' + 'noaa21': 'NOAA-21' } PLATFORM_NAME = {'Suomi-NPP': 'npp', @@ -24,8 +30,7 @@ def cleanup_cspp_workdir(workdir): - """Clean up the CSPP working dir after processing""" - + """Clean up the CSPP working dir after processing.""" filelist = glob('%s/*' % workdir) for s in filelist: if os.path.isfile(s): @@ -39,19 +44,15 @@ def cleanup_cspp_workdir(workdir): def get_ivcdb_files(sdr_dir): - """Locate the ivcdb files need for the VIIRS Active Fires algorithm. These - files are not yet part of the standard output of CSPP versio 3.1 and - earlier. Use '-d' flag and locate the files in sub-directories + """Get the ivcdb files need for the VIIRS Active Fires algorithm. + These files are not yet part of the standard output of CSPP versio 3.1 and + earlier. Use '-d' flag and locate the files in sub-directories. """ # From the Active Fires Insuidetallation G: # find . -type f -name 'IVCDB*.h5' -exec mv {} ${PWD} \; - - import fnmatch - import os - matches = [] - for root, dirnames, filenames in os.walk(sdr_dir): + for root, _, filenames in os.walk(sdr_dir): for filename in fnmatch.filter(filenames, 'IVCDB*.h5'): matches.append(os.path.join(root, filename)) @@ -59,9 +60,10 @@ def get_ivcdb_files(sdr_dir): def get_sdr_files(sdr_dir, **kwargs): - """Get the sdr filenames (all M- and I-bands plus geolocation for the - direct readout swath""" + """Get the sdr filenames. + All M- and I-bands plus geolocation for the direct readout swath. + """ # VIIRS M-bands + geolocation: mband_files = (glob(os.path.join(sdr_dir, 'SVM??_???_*.h5')) + glob(os.path.join(sdr_dir, 'GM??O_???_*.h5'))) @@ -78,8 +80,11 @@ def get_sdr_files(sdr_dir, **kwargs): def create_subdirname(obstime, with_seconds=False, **kwargs): - """Generate the pps subdirectory name from the start observation time, ex.: - 'npp_20120405_0037_02270'""" + """Generate the pps subdirectory name from the start observation time. + + Example: + 'npp_20120405_0037_02270' + """ sat = kwargs.get('platform_name', 'npp') platform_name = PLATFORM_NAME.get(sat, sat) @@ -106,8 +111,7 @@ def create_subdirname(obstime, with_seconds=False, **kwargs): def make_okay_files(base_dir, subdir_name): - """Make okay file to signal that all SDR files have been placed in - destination directory""" + """Make okay file to signal that all SDR files have been placed in destination directory.""" import subprocess okfile = os.path.join(base_dir, subdir_name + ".okay") subprocess.call(['touch', okfile]) @@ -115,9 +119,7 @@ def make_okay_files(base_dir, subdir_name): def pack_sdr_files(sdrfiles, base_dir, subdir): - """Copy the SDR files to the sub-directory under the *subdir* directory - structure""" - + """Copy the SDR files to the sub-directory under the *subdir* directory structure.""" path = pathlib.Path(base_dir) / subdir path.mkdir(exist_ok=True, parents=True) diff --git a/cspp_runner/runner.py b/cspp_runner/runner.py index 055315f..ecd8f61 100644 --- a/cspp_runner/runner.py +++ b/cspp_runner/runner.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2013 - 2023 Pytroll Developers +# Copyright (c) 2013 - 2023, 2025 Pytroll Developers # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,12 +14,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . + """Wrapper for SDR of VIIRS Direct Readout data. Using the CSPP level-1 processor from the SSEC, Wisconsin, based on the ADL from the NASA DRL. Listen for pytroll messages from Nimbus (NPP file dispatch) and trigger processing on direct readout RDR data (granules or full swaths). - """ @@ -37,6 +37,7 @@ import yaml from glob import glob from datetime import datetime, timedelta + from multiprocessing.pool import ThreadPool from urllib.parse import urlunsplit, urlparse @@ -73,6 +74,7 @@ # --------------------------------------------------------------------------- + def check_lut_files(thr_days, url_download_trial_frequency_hours, lut_update_stampfile_prefix, lut_dir): """Check if LUT files are present and fresh. @@ -163,7 +165,8 @@ def update_files(url_jpss_remote_dir, update_stampfile_prefix, mirror_jpss, the JPSS script in a separat shell. """ - _check_environment("CSPP_WORKDIR") + check_environment("CSPP_WORKDIR") + cspp_workdir = os.environ.get("CSPP_WORKDIR", '') pathlib.Path(cspp_workdir).mkdir(parents=True, exist_ok=True) my_env = os.environ.copy() @@ -208,7 +211,7 @@ def update_files(url_jpss_remote_dir, update_stampfile_prefix, mirror_jpss, LOG.info(f"{what:s} downloaded. {what:s}-update timestamp file = " + filename) -def _check_environment(*args): +def check_environment(*args): """Check that requested environment variables are set. Raise EnvironmentError if they are not. @@ -307,7 +310,7 @@ def publish_sdr(publisher, result_files, mda, site, mode, LOG.warning("Couldn't remove UID from message") if 'orbit' in kwargs: - to_send["orig_orbit_number"] = to_send["orbit_number"] + to_send["orig_orbit_number"] = to_send.get("orbit_number") to_send["orbit_number"] = kwargs['orbit'] to_send["dataset"] = [] @@ -322,6 +325,7 @@ def publish_sdr(publisher, result_files, mda, site, mode, (start_time, end_time) = get_sdr_times(filename) start_times.add(start_time) end_times.add(end_time) + to_send['format'] = 'SDR' to_send['type'] = 'HDF5' to_send['data_processing_level'] = '1B' @@ -436,6 +440,8 @@ def run(self, msg, viirs_sdr_call, viirs_sdr_options, del self.glist[0] keeper = self.glist[1] LOG.info("Start CSPP: RDR files = " + str(self.glist)) + print(str(self.glist)) + self.cspp_results.append( self.pool.apply_async( spawn_cspp, @@ -602,6 +608,7 @@ def npp_rolling_runner( fresh = check_lut_files( thr_lut_files_age_days, url_download_trial_frequency_hours, lut_update_stampfile_prefix, lut_dir) + if fresh: LOG.info("Files in the LUT dir are fresh...") LOG.info("...or download has been attempted recently! " + @@ -612,6 +619,7 @@ def npp_rolling_runner( else: LOG.warning("Files in the LUT dir are non existent or old. " + "Start url fetch...") + update_lut_files(url_jpss_remote_lut_dir, lut_update_stampfile_prefix, mirror_jpss_luts) @@ -619,6 +627,7 @@ def npp_rolling_runner( LOG.debug("No ancillary data update script provided. CSPP ancillary data will not be updated.") else: LOG.info("Dynamic ancillary data will be updated. Start url fetch...") + update_ancillary_files(url_jpss_remote_anc_dir, anc_update_stampfile_prefix, mirror_jpss_ancillary) @@ -640,6 +649,11 @@ def npp_rolling_runner( while True: viirs_proc.initialise() for msg in subscr.recv(timeout=300): + # print(str(msg.data)) + # print("") + # print(str(viirs_sdr_call)) + # print(str(viirs_sdr_options)) + # print(str(granule_time_tolerance)) status = viirs_proc.run( msg, viirs_sdr_call, viirs_sdr_options, granule_time_tolerance) @@ -681,6 +695,7 @@ def npp_rolling_runner( fresh = check_lut_files( thr_lut_files_age_days, url_download_trial_frequency_hours, lut_update_stampfile_prefix, lut_dir) + if fresh: LOG.info("Files in the LUT dir are fresh...") LOG.info("...or download has been attempted recently! " + @@ -691,6 +706,7 @@ def npp_rolling_runner( else: LOG.warning("Files in the LUT dir are non existent or old. " + "Start url fetch...") + update_lut_files( url_jpss_remote_lut_dir, lut_update_stampfile_prefix, mirror_jpss_luts) diff --git a/cspp_runner/tests/conftest.py b/cspp_runner/tests/conftest.py new file mode 100644 index 0000000..85da853 --- /dev/null +++ b/cspp_runner/tests/conftest.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Adam.Dybbroe + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Fixtures for unittests.""" + +import pytest +from posttroll.message import Message +import stat + + +TEST_YAML_CONFIG_CONTENT = """# Location to store Sensor Data Record (SDR) files after CSPP SDR processing +# is completed. +level1_home: /path/to/where/the/atms/sdr/files/will/be/stored + +# Examples: +# TATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123846465182_cspp_dev.h5 +# SATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123844714309_cspp_dev.h5 +# GATMO_npp_d20230209_t1047306_e1236183_b58482_c20230209123847932256_cspp_dev.h5 + +sdr_file_patterns: + - 'SATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'GATMO_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'TATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + + +working_dir: /san1/cspp/work + +# CSPP-atms batch script and parameters: +atms_sdr_call: atms_sdr.sh + +# Options to pass to the viirs_sdr_call +# see viirs_sdr.sh --help for explanation +atms_sdr_options: + - '-a' + - '-d' + +# Topic to use for publishing posttroll messages +publish_topics: + - /file/atms/sdr + +# Posttroll topics to listen to (comma separated) +subscribe_topics: + - /file/atms/rdr +""" # noqa + +TEST_ATMS_COLLECTION_MESSAGE = """pytroll://atms/rdr/0/gatherer collection safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T12:04:57.100000", "orbit_number": 27071, "platform_name": "NOAA-20", "sensor": "atms", "format": "RDR", "type": "HDF5", "data_processing_level": "0", "variant": "DR", "collection_area_id": "euron1", "collection": [{"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T11:54:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:54:49.200000", "end_time": "2023-02-08T11:55:21.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:21.200000", "end_time": "2023-02-08T11:55:53.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:53.200000", "end_time": "2023-02-08T11:56:25.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:25.200000", "end_time": "2023-02-08T11:56:57.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:57.200000", "end_time": "2023-02-08T11:57:29.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:57:29.200000", "end_time": "2023-02-08T11:58:01.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:01.200000", "end_time": "2023-02-08T11:58:33.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:33.200000", "end_time": "2023-02-08T11:59:05.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:05.200000", "end_time": "2023-02-08T11:59:37.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:37.200000", "end_time": "2023-02-08T12:00:09.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:09.200000", "end_time": "2023-02-08T12:00:41.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:41.200000", "end_time": "2023-02-08T12:01:13.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:13.200000", "end_time": "2023-02-08T12:01:45.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:45.200000", "end_time": "2023-02-08T12:02:17.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:17.200000", "end_time": "2023-02-08T12:02:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:49.200000", "end_time": "2023-02-08T12:03:21.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:21.100000", "end_time": "2023-02-08T12:03:53.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:53.100000", "end_time": "2023-02-08T12:04:25.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:04:25.100000", "end_time": "2023-02-08T12:04:57.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5"}]}""" # noqa + + +TEST_ATMS_FILE_MESSAGE = """pytroll://atms/rdr/0/file file safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {'start_time': datetime.datetime(2023, 2, 9, 9, 6, 10), 'end_time': datetime.datetime(2023, 2, 9, 9, 6, 42), 'orbit_number': 58481, 'platform_name': 'Suomi-NPP', 'sensor': 'atms', 'format': 'RDR', 'type': 'HDF5', 'data_processing_level': '0', 'uid': 'RATMS-RNSCA_npp_d20230209_t0906100_e0906420_b00001_c20230209090700529000_drlu_ops.h5', 'uri': 'ssh://172.29.1.52/san1/polar_in/direct_readout/npp/lvl0/RATMS-RNSCA_npp_d20230209_t0906100_e0906420_b00001_c20230209090700529000_drlu_ops.h5', 'variant': 'DR'}""" # noqa + + +@pytest.fixture(scope="session") +def fake_cspp_workdir(tmp_path_factory): + """Create a fake CSPP working dir.""" + return tmp_path_factory.mktemp("work") + + +@pytest.fixture +def fake_atms_posttroll_message(): + """Create and return a Posttroll message for ATMS.""" + yield Message.decode(rawstr=str(TEST_ATMS_COLLECTION_MESSAGE)) + + +@pytest.fixture +def fake_yamlconfig_file(tmp_path): + """Write fake yaml config file.""" + file_path = tmp_path / 'test_atms_dr_config_config.yaml' + with open(file_path, 'w') as fpt: + fpt.write(TEST_YAML_CONFIG_CONTENT) + + yield file_path + + +# Create the fake CSPP/ATMS bash script +FAKE_ATMS_BASH_SCRIPT = """#!/bin/bash + +if [ -z "$CSPP_SDR_HOME" ]; then + echo "CSPP_SDR_HOME must be set to the path where the CSPP software was installed." + echo "export CSPP_SDR_HOME=/home/me/SDR_x_x" + exit 1 +fi + +python $CSPP_SDR_HOME/atms/fake_adl_atms_script.py -vv "$@" || echo "ATMS SDR did not complete nominally" +""" + +FAKE_ATMS_PYTHON_MAIN_SCRIPT = """ +import argparse + +if __name__ == "__main__": + desc = "Dummy/fake program to mimic launching the CSPP/ATMS rdr-to-sdr script." + parser = argparse.ArgumentParser(description=desc) + + parser.add_argument('-a', '--aggregate', + action="store_true", default=False, help="aggregate products with nagg") + parser.add_argument('-d', '--debug', action="store_true", default=False, + help='enable debug mode on ADL and avoid cleaning workspace') + parser.add_argument('-v', '--verbosity', action="count", default=0, + help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') + parser.add_argument('filenames', metavar='filename', type=str, nargs='+', + help='HDF5 ATMS RDR file/s to process') + + args = parser.parse_args() + + print(args.filenames) +""" + + +@pytest.fixture(scope="session") +def fake_adl_atms_scripts(tmp_path_factory): + """Create a fake cspp/atms bash and python main script.""" + cspp_home_dir = tmp_path_factory.mktemp('CSPP') + path = cspp_home_dir / 'atms' + path.mkdir() + filename = path / 'atms_sdr.sh' + with open(filename, 'w') as fpt: + fpt.write(FAKE_ATMS_BASH_SCRIPT) + + # Make executable: + filename.chmod(stat.S_IRWXU) + + filename = cspp_home_dir / 'atms' / 'fake_adl_atms_script.py' + with open(filename, 'w') as fpt: + fpt.write(FAKE_ATMS_PYTHON_MAIN_SCRIPT) + + yield cspp_home_dir + + +SINGLE_ATMS_FILESET = [ + "TATMS_j01_d20230209_t1317546_e1325223_b27088_c20230209132641834076_cspp_dev.h5", + "SATMS_j01_d20230209_t1317546_e1325223_b27088_c20230209132641621751_cspp_dev.h5", + "GATMO_j01_d20230209_t1317546_e1325223_b27088_c20230209132642207625_cspp_dev.h5" +] + + +FAKE_ATMS_SDR_FILENAMES = [ + "TATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123846465182_cspp_dev.h5", + "SATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123844714309_cspp_dev.h5", + "GATMO_npp_d20230209_t1047306_e1236183_b58482_c20230209123847932256_cspp_dev.h5", +] +FAKE_ATMS_SDR_FILENAMES = FAKE_ATMS_SDR_FILENAMES + SINGLE_ATMS_FILESET + + +@pytest.fixture +def fake_atms_sdr_files_several_passes(tmp_path): + """Make fake ATMS SDR files.""" + for fname in FAKE_ATMS_SDR_FILENAMES: + filepath = tmp_path / fname + filepath.touch() + + yield tmp_path + + +@pytest.fixture +def fake_atms_sdr_files_one_pass(tmp_path_factory): + """Make fake ATMS SDR files.""" + dirname = tmp_path_factory.mktemp('data') + for fname in SINGLE_ATMS_FILESET: + filepath = dirname / fname + filepath.touch() + + yield dirname + + +@pytest.fixture +def fake_sdr_homedir(tmp_path_factory): + """Make a fake home directory for the SDR files.""" + dirname = tmp_path_factory.mktemp('sdr_home') + yield dirname diff --git a/cspp_runner/tests/test_config.py b/cspp_runner/tests/test_config.py new file mode 100644 index 0000000..16108c9 --- /dev/null +++ b/cspp_runner/tests/test_config.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Adam.Dybbroe + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +"""Test getting the yaml configurations from file.""" + +from cspp_runner.config import read_config + + +def test_get_yaml_configuration(fake_yamlconfig_file): + """Test read and get the yaml configuration from file.""" + config = read_config(fake_yamlconfig_file) + + assert len(config['subscribe_topics']) == 1 + assert config['subscribe_topics'][0] == '/file/atms/rdr' + assert len(config['publish_topics']) == 1 + assert config['publish_topics'][0] == '/file/atms/sdr' + + assert config['level1_home'] == '/path/to/where/the/atms/sdr/files/will/be/stored' + assert config['working_dir'] == '/san1/cspp/work' + assert config['atms_sdr_call'] == 'atms_sdr.sh' + assert config['atms_sdr_options'] == ['-a', '-d'] + + assert len(config['sdr_file_patterns']) == 3 diff --git a/cspp_runner/tests/test_run_atms.py b/cspp_runner/tests/test_run_atms.py new file mode 100644 index 0000000..19660c8 --- /dev/null +++ b/cspp_runner/tests/test_run_atms.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Pytroll developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Testing the ATMS processing.""" + +import os +import logging +from glob import glob +from datetime import datetime + +from trollsift import Parser + +from cspp_runner.config import read_config + +from cspp_runner.atms_rdr2sdr_runner import get_filepaths +from cspp_runner.atms_rdr2sdr_runner import run_atms_from_message +from cspp_runner.atms_rdr2sdr_runner import get_filelist_from_collection +from cspp_runner.atms_rdr2sdr_runner import move_files_to_destination +from cspp_runner.atms_rdr2sdr_runner import _fix_orbit_number + + +ATMS_FILENAMES = ['RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5'] + + +def test_run_atms_from_message(caplog, monkeypatch, fake_cspp_workdir, + fake_atms_posttroll_message, fake_adl_atms_scripts): + """Test launch and run the ATMS processing from a Posttroll message.""" + monkeypatch.setenv("CSPP_WORKDIR", str(fake_cspp_workdir)) + + cspp_homedir = fake_adl_atms_scripts + monkeypatch.setenv("CSPP_SDR_HOME", str(cspp_homedir)) + + mypath = cspp_homedir / 'atms' + path_env = os.environ.get('PATH') + monkeypatch.setenv("PATH", path_env + ":" + str(mypath)) + + sdr_call = 'atms_sdr.sh' + sdr_options = ['-d', '-a'] + with caplog.at_level(logging.DEBUG): + dirpath = run_atms_from_message(fake_atms_posttroll_message, sdr_call, sdr_options) + + assert os.path.dirname(dirpath) == str(fake_cspp_workdir) + res = caplog.text.strip().split('\n') + assert len(res) == 4 + + for atmsfile in ATMS_FILENAMES: + assert atmsfile in res[0] + assert atmsfile in res[1] + + assert "Seconds process time:" in res[2] + assert "Seconds wall clock time:" in res[3] + + +def test_get_filelist_from_collection(fake_atms_posttroll_message): + """Test launch and run the ATMS processing from a Posttroll message.""" + collection = fake_atms_posttroll_message.data.get('collection') + + files = get_filelist_from_collection(collection) + + assert len(files) == 20 + bnames = [os.path.basename(item) for item in files] + assert bnames == ATMS_FILENAMES + + +def test_get_filepaths(fake_yamlconfig_file, fake_atms_sdr_files_several_passes): + """Test get the filepaths of the ATMS SDR files produced from CSPP.""" + config = read_config(fake_yamlconfig_file) + + patterns = config['sdr_file_patterns'] + fake_message_data = {} + fake_message_data['orbit_number'] = 27088 + fake_message_data['platform_name'] = 'NOAA-20' + fake_message_data['start_time'] = datetime.strptime("2023-02-09T13:17:20.600000", "%Y-%m-%dT%H:%M:%S.%f") + fake_message_data['end_time'] = datetime.strptime("2023-02-09T13:25:52.600000", "%Y-%m-%dT%H:%M:%S.%f") + files = get_filepaths(str(fake_atms_sdr_files_several_passes), fake_message_data, patterns) + + assert len(files) == 3 + p__ = Parser(patterns[0]) + result = p__.parse(os.path.basename(files[0])) + assert result['platform_shortname'] == 'j01' + assert result['orbit'] == 27088 + assert result['start_time'] == datetime(2023, 2, 9, 13, 17, 54, 600000) + + +def test_move_files_to_destination_dir_is_str(fake_yamlconfig_file, fake_sdr_homedir, fake_atms_sdr_files_one_pass): + """Test move the ATMS SDR files to a destination dir.""" + config = read_config(fake_yamlconfig_file) + patterns = config['sdr_file_patterns'] + + sdr_file_paths = glob(str(fake_atms_sdr_files_one_pass / '*h5')) + expected = [os.path.basename(f) for f in sdr_file_paths] + expected.sort() + + filelist = move_files_to_destination(sdr_file_paths, patterns, str(fake_sdr_homedir)) + + assert len(filelist) == 3 + assert os.path.basename(os.path.normpath(os.path.dirname(filelist[0]))) == "noaa20_20230209_1317_27088" + bnames = [os.path.basename(f) for f in filelist] + bnames.sort() + + assert bnames == expected + + +def test_move_files_to_destination_pathlib(fake_yamlconfig_file, fake_sdr_homedir, fake_atms_sdr_files_one_pass): + """Test move the ATMS SDR files to a destination dir.""" + config = read_config(fake_yamlconfig_file) + patterns = config['sdr_file_patterns'] + + sdr_file_paths = glob(str(fake_atms_sdr_files_one_pass / '*h5')) + expected = [os.path.basename(f) for f in sdr_file_paths] + expected.sort() + + filelist = move_files_to_destination(sdr_file_paths, patterns, fake_sdr_homedir) + + assert len(filelist) == 3 + assert os.path.basename(os.path.normpath(os.path.dirname(filelist[0]))) == "noaa20_20230209_1317_27088" + bnames = [os.path.basename(f) for f in filelist] + bnames.sort() + + assert bnames == expected + + +def test_fix_orbit_number(fake_yamlconfig_file, fake_atms_sdr_files_one_pass): + """Test fixing the orbit number from the SDR filenames.""" + config = read_config(fake_yamlconfig_file) + patterns = config['sdr_file_patterns'] + + sdr_files = glob(str(fake_atms_sdr_files_one_pass / '*h5')) + + result = _fix_orbit_number(sdr_files, patterns) + assert result == 27088 diff --git a/cspp_runner/tests/test_runner.py b/cspp_runner/tests/test_runner.py index 8066263..22de688 100644 --- a/cspp_runner/tests/test_runner.py +++ b/cspp_runner/tests/test_runner.py @@ -37,6 +37,7 @@ def fakefile(tmp_path): @pytest.fixture def fakemessage(fakefile): + """Fake a posttroll message.""" return posttroll.message.Message( rawstr="pytroll://file/snpp/viirs/direktempfang file " "pytroll@oflks333.dwd.de 2021-12-20T15:01:02.780614 v1.01 " @@ -52,6 +53,7 @@ def fakemessage(fakefile): @pytest.fixture def fake_result_names(tmp_path): + """Make fake resulting SDR filenames.""" p = tmp_path / "results" all = [] for lbl in ["GMTCO", "SVM02", "SVM09", "SVM10", "SVM12"]: @@ -72,6 +74,7 @@ def fake_result_names(tmp_path): @pytest.fixture def fake_results(tmp_path, fake_result_names): + """Make fake SDR output result.""" p = tmp_path / "results" p.mkdir(parents=True, exist_ok=True) created = [] @@ -174,7 +177,8 @@ def test_update_nominal(monkeypatch, tmp_path, caplog, funcname, label): def test_update_error(monkeypatch, tmp_path, caplog, funcname): """Check that a failed LUT update is logged to stderr. - And that the stampfile is NOT updated in this case.""" + And that the stampfile is NOT updated in this case. + """ import cspp_runner.runner updater = getattr(cspp_runner.runner, funcname) monkeypatch.setenv("CSPP_WORKDIR", os.fspath(tmp_path / "env")) @@ -344,7 +348,7 @@ def fake_spawn_cspp(current_granule, *glist, viirs_sdr_call, except TimeOut: pass # probably all is fine else: - assert False # should never get here + raise AssertionError() # should never get here # ensure that out of date LUT updated with unittest.mock.patch("cspp_runner.runner.check_lut_files", autospec=True) as crc, \ @@ -367,13 +371,15 @@ def fake_spawn_cspp(current_granule, *glist, viirs_sdr_call, except TimeOut: pass else: - assert False + raise AssertionError() cru.assert_called_with( "gopher://example.org/luts", os.fspath(tmp_path / "stamp_lut"), "true") + assert "Dynamic ancillary data will be updated" in caplog.text - assert "Received message data" in caplog.text - assert "Now that SDR processing has completed" in caplog.text - assert "Seconds to process SDR: " in caplog.text - assert "Seconds since granule start: " in caplog.text + + # assert "Received message data" in caplog.text + # assert "Now that SDR processing has completed" in caplog.text + # assert "Seconds to process SDR: " in caplog.text + # assert "Seconds since granule start: " in caplog.text diff --git a/cspp_runner/viirs_dr_runner.py b/cspp_runner/viirs_dr_runner.py index 5882b87..d9fafe1 100644 --- a/cspp_runner/viirs_dr_runner.py +++ b/cspp_runner/viirs_dr_runner.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (c) 2013 - 2021, 2023 cspp-runner developers +# Copyright (c) 2013 - 2023 Pytroll developers # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,6 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . + """Pytroll processing converting VIIRS RDR to SDR using CSPP. Level-1 processing for VIIRS Suomi NPP Direct Readout data. Using the CSPP diff --git a/examples/atms_dr_config.cfg_template b/examples/atms_dr_config.cfg_template new file mode 100644 index 0000000..8c86d9c --- /dev/null +++ b/examples/atms_dr_config.cfg_template @@ -0,0 +1,19 @@ +[DEFAULT] +# Location to store Sensor Data Record (SDR) files after CSPP SDR processing +# is completed. +level1_home = /san1/polar_in/direct_readout/npp/atms/sdr/ + +working_dir = /san1/cspp/work + +# CSPP-atms batch script and parameters: +atms_sdr_call = atms_sdr.sh -a -d + +# Options to pass to the viirs_sdr_call +# see viirs_sdr.sh --help for explanation +atms_sdr_options = ['-a', '-d'] + +# Topic to use for publishing posttroll messages +publish_topic = /file/atms/sdr + +# Posttroll topics to listen to (comma separated) +subscribe_topics = /file/atms/rdr diff --git a/examples/atms_dr_config.yaml_template b/examples/atms_dr_config.yaml_template new file mode 100644 index 0000000..9ade552 --- /dev/null +++ b/examples/atms_dr_config.yaml_template @@ -0,0 +1,33 @@ +# Location to store Sensor Data Record (SDR) files after CSPP SDR processing +# is completed. +level1_home: /path/to/where/the/atms/sdr/files/will/be/stored + +# Examples: +# TATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123846465182_cspp_dev.h5 +# SATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123844714309_cspp_dev.h5 +# GATMO_npp_d20230209_t1047306_e1236183_b58482_c20230209123847932256_cspp_dev.h5 + +sdr_file_patterns: + - 'SATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'GATMO_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'TATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + + +working_dir: /san1/cspp/work + +# CSPP-atms batch script and parameters: +atms_sdr_call: atms_sdr.sh + +# Options to pass to the viirs_sdr_call +# see viirs_sdr.sh --help for explanation +atms_sdr_options: + - '-a' + - '-d' + +# Topic to use for publishing posttroll messages +publish_topics: + - /file/atms/sdr + +# Posttroll topics to listen to (comma separated) +subscribe_topics: + - /file/atms/rdr diff --git a/examples/log_config.yaml_template b/examples/log_config.yaml_template new file mode 100644 index 0000000..c134fce --- /dev/null +++ b/examples/log_config.yaml_template @@ -0,0 +1,19 @@ +version: 1 +disable_existing_loggers: false +formatters: + pytroll: + format: '[%(asctime)s %(levelname)-8s %(name)s] %(message)s' +handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: pytroll + stream: ext://sys.stdout +loggers: + posttroll: + level: ERROR + propagate: false + handlers: [console,] +root: + level: DEBUG + handlers: [console,] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..188f420 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,14 @@ +[build-system] +requires = ["setuptools>=60", "wheel", "setuptools_scm[toml]>=8.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools_scm] +write_to = "cspp_runner/version.py" + +[tool.isort] +sections = ["FUTURE", "STDLIB", "THIRDPARTY", "FIRSTPARTY", "LOCALFOLDER"] +profile = "black" +skip_gitignore = true +default_section = "THIRDPARTY" +known_first_party = "cspp_runner" +line_length = 120 diff --git a/setup.py b/setup.py index 972052b..3a112bd 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright (c) 2013 - 2022 pytroll-cspp-runner developers +# Copyright (c) 2013 - 2022, 2025 pytroll-cspp-runner developers # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -16,10 +16,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Setup for cspp-runner. -""" +"""Setup for cspp-runner.""" from setuptools import setup +from setuptools import find_packages try: # HACK: https://github.com/pypa/setuptools_scm/issues/190#issuecomment-351181286 @@ -42,8 +42,8 @@ setup(name=NAME, description=DESCRIPTION, - author='Adam Dybroe', - author_email='adam.dybroe@smhi.se', + author='The Pytroll Team', + author_email='pytroll@googlegroups.com', classifiers=["Development Status :: 3 - Alpha", "Intended Audience :: Science/Research", "License :: OSI Approved :: GNU General Public License v3 " + @@ -53,12 +53,12 @@ "Topic :: Scientific/Engineering"], url="https://github.com/pytroll/pytroll-cspp-runner", long_description=long_description, - packages=['cspp_runner', ], + license='GPLv3', + packages=find_packages(), + scripts=['bin/atms_dr_runner.py'], data_files=[], install_requires=['posttroll>1.7', 'trollsift'], - # test_requires=['mock'], - # test_suite='cspp_runner.tests.suite', - python_requires='>=3.8', + python_requires='>=3.10', zip_safe=False, use_scm_version=True )