From 4a1cd1928ecf3208a209518e3563053b53466cd2 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Wed, 8 Feb 2023 16:44:52 +0100 Subject: [PATCH 01/17] First attempts to add support for ATMS SDR processing Signed-off-by: Adam.Dybbroe --- bin/atms_dr_runner.py | 106 ++++++++++++++++ cspp_runner/atms_rdr2sdr_runner.py | 176 ++++++++++++++++++++++++++ cspp_runner/config.py | 34 +++++ cspp_runner/logger.py | 55 ++++++++ cspp_runner/runner.py | 93 +++++++------- cspp_runner/tests/conftest.py | 75 +++++++++++ cspp_runner/tests/test_config.py | 41 ++++++ cspp_runner/tests/test_run_atms.py | 79 ++++++++++++ cspp_runner/viirs_dr_runner.py | 53 ++++---- examples/atms_dr_config.yaml_template | 22 ++++ examples/log_config.yaml_template | 19 +++ setup.py | 7 +- 12 files changed, 683 insertions(+), 77 deletions(-) create mode 100644 bin/atms_dr_runner.py create mode 100644 cspp_runner/atms_rdr2sdr_runner.py create mode 100644 cspp_runner/config.py create mode 100644 cspp_runner/logger.py create mode 100644 cspp_runner/tests/conftest.py create mode 100644 cspp_runner/tests/test_config.py create mode 100644 cspp_runner/tests/test_run_atms.py create mode 100644 examples/atms_dr_config.yaml_template create mode 100644 examples/log_config.yaml_template diff --git a/bin/atms_dr_runner.py b/bin/atms_dr_runner.py new file mode 100644 index 0000000..7ad4a85 --- /dev/null +++ b/bin/atms_dr_runner.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2022, 2023 Pytroll developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Level-1 processing for Direct Readout S-NPP/JPSS ATMS data. + + +Using the CSPP level-1 processor from the SSEC, Wisconsin, based on the ADL +software from NASA. Listen for pytroll messages of ready RDR files trigger +processing on direct readout RDR data (granules or full swaths). + +""" + +import argparse +import logging +import ast +import configparser +import os +import sys +import logging +import logging.handlers + +from cspp_runner.atms_rdr2sdr_runner import AtmsSdrRunner +from cspp_runner.config import read_config +from cspp_runner.logger import setup_logging + +CSPP_SDR_HOME = os.environ.get("CSPP_SDR_HOME", '') + +#: Default time format +_DEFAULT_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' + +#: Default log format +_DEFAULT_LOG_FORMAT = '[%(levelname)s: %(asctime)s : %(name)s] %(message)s' + + +LOG = logging.getLogger(__name__) + + +def get_parser(): + """Get parser for commandline-arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config-file", + required=True, + dest="config_file", + type=str, + default=None, + help="The file containing configuration parameters.") + parser.add_argument("-l", "--log-config", + help="Log config file to use instead of the standard logging.") + return parser + + +def parse_args(): + """Parse command-line arguments.""" + parser = get_parser() + return parser.parse_args() + + +def main(): + """Start the CSPP ATMS runner.""" + cmd_args = parse_args() + print("Read config from", cmd_args.config_file) + + setup_logging(cmd_args) + + #OPTIONS = read_config(cmd_args.config_file) + #publish_topics = OPTIONS.get('publish_topics') + #subscribe_topics = OPTIONS.get('subscribe_topics') + #viirs_sdr_options = OPTIONS.get("atms_sdr_options") + + breakpoint() + + try: + atms = AtmsSdrRunner(cmd_args.config_file) + except Exception as err: + LOG.error('ATMS RDR to SDR processing crashed: %s', str(err)) + sys.exit(1) + try: + atms.start() + atms.join() + except KeyboardInterrupt: + LOG.debug("Interrupting") + finally: + atms.close() + + +if __name__ == "__main__": + main() diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py new file mode 100644 index 0000000..a112f42 --- /dev/null +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Pytroll developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +""" +import socket +from trollsift import Parser, globify +import os +import pathlib +import subprocess +from urllib.parse import urlparse +import logging +import time + +import signal +from queue import Empty +from threading import Thread +from posttroll.listener import ListenerContainer +from posttroll.message import Message +from posttroll.publisher import NoisyPublisher +from cspp_runner.config import read_config +from cspp_runner.runner import check_environment + +logger = logging.getLogger(__name__) + + +class AtmsSdrRunner(Thread): + """The ATMS CSPP runner to process RDR to SDR files.""" + + def __init__(self, configfile): + """Initialize the runner object.""" + super().__init__() + + self.configfile = configfile + self.options = {} + config = read_config(self.configfile) + self.options = config + + self.input_topics = self.options['subscribe_topics'] + self.output_topics = self.options['publish_topics'] + + self._atms_sdr_call = self.options['atms_sdr_call'] + self._atms_sdr_options = self.options['atms_sdr_options'] + + self.listener = None + self.publisher = None + self.loop = False + self._setup_and_start_communication() + + def _setup_and_start_communication(self): + """Set up the Posttroll communication and start the publisher.""" + logger.debug("Starting up... Input topics:") + for top in self.input_topics: + logger.debug("{topic}".format(topic=str(top))) + + self.listener = ListenerContainer(topics=self.input_topics) + self.publisher = NoisyPublisher("atms-rdr2sdr-runner") + self.publisher.start() + self.loop = True + signal.signal(signal.SIGTERM, self.signal_shutdown) + + def signal_shutdown(self, *args, **kwargs): + """Shutdown the ATMS rdr2sdr runner.""" + self.close() + + def run(self): + """Run the CSPP ATMS RDR2SDR processing.""" + while self.loop: + try: + msg = self.listener.output_queue.get(timeout=1) + logger.debug("Message: %s", str(msg.data)) + except Empty: + continue + else: + if msg.type not in ['file', 'collection', 'dataset']: + logger.debug("Message type not supported: %s", str(msg.type)) + continue + + #platform_name = msg.data.get('platform_name') + + wrkdir = run_atms_from_message(msg, self._atms_sdr_call, self._atms_sdr_options) + + logger.warning("Do nothing...") + + # filename = get_filename_from_uri(msg.data.get('uri')) + # if not os.path.exists(filename): + # logger.warning("File does not exist!") + # continue + + # file_ok = check_file_type_okay(msg.data.get('type')) + + # for output_msg in output_messages: + # if output_msg: + # logger.debug("Sending message: %s", str(output_msg)) + # self.publisher.send(str(output_msg)) + + +def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): + """Trigger ATMS processing on ATMS scene, from Posttroll message.""" + + platform_name = posttroll_msg.data.get('platform_name') + sensor = posttroll_msg.data.get('sensor') + + collection = posttroll_msg.data.get('collection') + if collection: + atms_rdr_files = get_filelist_from_collection(collection) + else: + logger.warning("ATMS processing so far only supports running on collection of files.") + return + + # Process ATMS files in a sub process: + check_environment("CSPP_WORKDIR") + cspp_workdir = os.environ.get("CSPP_WORKDIR", '') + pathlib.Path(cspp_workdir).mkdir(parents=True, exist_ok=True) + + my_env = os.environ.copy() + + # Run the command: + cmdlist = [sdr_call] + cmdlist.extend(sdr_options) + cmdlist.extend(atms_rdr_files) + + t0_clock = time.process_time() + t0_wall = time.time() + logger.info("Popen call arguments: " + str(cmdlist)) + + sdr_proc = subprocess.Popen(cmdlist, cwd=cspp_workdir, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE) + while True: + line = sdr_proc.stdout.readline() + if not line: + break + logger.info(line.decode("utf-8").strip('\n')) + + while True: + errline = sdr_proc.stderr.readline() + if not errline: + break + logger.info(errline.decode("utf-8").strip('\n')) + + logger.info("Seconds process time: " + (str(time.process_time() - t0_clock))) + logger.info("Seconds wall clock time: " + (str(time.time() - t0_wall))) + + sdr_proc.poll() + + return cspp_workdir + + +def get_filelist_from_collection(atms_collection): + """From a posttroll message extract the ATMS files from collection.""" + filelist = [] + for obj in atms_collection: + urlobj = urlparse(obj['uri']) + filelist.append(urlobj.path) + + return filelist diff --git a/cspp_runner/config.py b/cspp_runner/config.py new file mode 100644 index 0000000..1e8aff4 --- /dev/null +++ b/cspp_runner/config.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Pytroll Developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Handling the yaml configurations.""" + +import yaml +from yaml import UnsafeLoader + + +def read_config(config_filepath): + """Read and extract config information.""" + with open(config_filepath, 'r') as fp_: + config = yaml.load(fp_, Loader=UnsafeLoader) + + return config diff --git a/cspp_runner/logger.py b/cspp_runner/logger.py new file mode 100644 index 0000000..ff520ab --- /dev/null +++ b/cspp_runner/logger.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Pytroll developers + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""The log handling.""" + +import logging +import logging.config +import logging.handlers +import yaml + +LOG_FORMAT = "[%(asctime)s %(levelname)-8s] %(message)s" + +log_levels = { + 0: logging.WARN, + 1: logging.INFO, + 2: logging.DEBUG, +} + + +def setup_logging(cmd_args): + """Set up logging.""" + if cmd_args.log_config is not None: + with open(cmd_args.log_config) as fd: + log_dict = yaml.safe_load(fd.read()) + logging.config.dictConfig(log_dict) + return + + root = logging.getLogger('') + root.setLevel(log_levels[cmd_args.verbosity]) + + fh_ = logging.StreamHandler() + + formatter = logging.Formatter(LOG_FORMAT) + fh_.setFormatter(formatter) + + root.addHandler(fh_) diff --git a/cspp_runner/runner.py b/cspp_runner/runner.py index e41554d..59c1d9a 100644 --- a/cspp_runner/runner.py +++ b/cspp_runner/runner.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 - 2021 pytroll-cspp-runner developers +# Copyright (c) 2013 - 2021, 2023 pytroll-cspp-runner developers # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -144,17 +144,17 @@ def update_lut_files(url_jpss_remote_lut_dir, """ update_files( - url_jpss_remote_lut_dir, - lut_update_stampfile_prefix, - mirror_jpss_luts, - "LUT", - timeout=timeout) + url_jpss_remote_lut_dir, + lut_update_stampfile_prefix, + mirror_jpss_luts, + "LUT", + timeout=timeout) def update_files(url_jpss_remote_dir, update_stampfile_prefix, mirror_jpss, what, timeout=600): - _check_environment("CSPP_WORKDIR") + check_environment("CSPP_WORKDIR") cspp_workdir = os.environ.get("CSPP_WORKDIR", '') pathlib.Path(cspp_workdir).mkdir(parents=True, exist_ok=True) my_env = os.environ.copy() @@ -165,13 +165,13 @@ def update_files(url_jpss_remote_dir, update_stampfile_prefix, mirror_jpss, LOG.info(f"Download command for {what:s}: {cmd!s}") proc = subprocess.Popen( - cmd, shell=False, env=my_env, - cwd=cspp_workdir, - stderr=subprocess.PIPE, stdout=subprocess.PIPE) + cmd, shell=False, env=my_env, + cwd=cspp_workdir, + stderr=subprocess.PIPE, stdout=subprocess.PIPE) - while (line := proc.stdout.readline()): + while (line:=proc.stdout.readline()): LOG.info(line.decode("utf-8").strip('\n')) - while (line := proc.stderr.readline()): + while (line:=proc.stderr.readline()): LOG.error(line.decode("utf-8").strip('\n')) try: @@ -181,8 +181,8 @@ def update_files(url_jpss_remote_dir, update_stampfile_prefix, mirror_jpss, if returncode != 0: LOG.exception( - f"Attempt to update {what:s} files failed with exit code " - f"{returncode:d}.") + f"Attempt to update {what:s} files failed with exit code " + f"{returncode:d}.") else: now = datetime.utcnow() timestamp = now.strftime('%Y%m%d%H%M') @@ -199,7 +199,7 @@ def update_files(url_jpss_remote_dir, update_stampfile_prefix, mirror_jpss, LOG.info(f"{what:s} downloaded. {what:s}-update timestamp file = " + filename) -def _check_environment(*args): +def check_environment(*args): """Check that requested environment variables are set. Raise EnvironmentError if they are not. @@ -228,16 +228,15 @@ def update_ancillary_files(url_jpss_remote_anc_dir, """ update_files( - url_jpss_remote_anc_dir, - anc_update_stampfile_prefix, - mirror_jpss_ancillary, - "ANC", - timeout=timeout) + url_jpss_remote_anc_dir, + anc_update_stampfile_prefix, + mirror_jpss_ancillary, + "ANC", + timeout=timeout) def run_cspp(viirs_sdr_call, viirs_sdr_options, *viirs_rdr_files): - """Run CSPP on VIIRS RDR files""" - + """Run CSPP on VIIRS RDR files.""" LOG.info("viirs_sdr_options = " + str(viirs_sdr_options)) path = os.environ["PATH"] LOG.info("Path from environment: %s", str(path)) @@ -259,9 +258,9 @@ def run_cspp(viirs_sdr_call, viirs_sdr_options, *viirs_rdr_files): t0_wall = time.time() LOG.info("Popen call arguments: " + str(cmdlist)) viirs_sdr_proc = subprocess.Popen( - cmdlist, cwd=working_dir, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE) + cmdlist, cwd=working_dir, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE) while True: line = viirs_sdr_proc.stdout.readline() if not line: @@ -434,12 +433,12 @@ def run(self, msg, viirs_sdr_call, viirs_sdr_options, keeper = self.glist[1] LOG.info("Start CSPP: RDR files = " + str(self.glist)) self.cspp_results.append( - self.pool.apply_async( - spawn_cspp, - [keeper] + self.glist, - {"viirs_sdr_call": viirs_sdr_call, - "viirs_sdr_options": viirs_sdr_options, - "granule_time_tolerance": granule_time_tolerance})) + self.pool.apply_async( + spawn_cspp, + [keeper] + self.glist, + {"viirs_sdr_call": viirs_sdr_call, + "viirs_sdr_options": viirs_sdr_options, + "granule_time_tolerance": granule_time_tolerance})) LOG.debug("Inside run: Return with a False...") return False elif msg and ('platform_name' not in msg.data or 'sensor' not in msg.data): @@ -506,12 +505,12 @@ def run(self, msg, viirs_sdr_call, viirs_sdr_options, rdr_filename, orbnum = fix_rdrfile(rdr_filename) except IOError: LOG.exception( - 'Failed to fix orbit number in RDR file = ' + - str(urlobj.path)) + 'Failed to fix orbit number in RDR file = ' + + str(urlobj.path)) except cspp_runner.orbitno.NoTleFile: LOG.exception( - 'Failed to fix orbit number in RDR file = ' + - str(urlobj.path)) + 'Failed to fix orbit number in RDR file = ' + + str(urlobj.path)) LOG.error('No TLE file...') if orbnum: self.orbit_number = orbnum @@ -556,11 +555,11 @@ def run(self, msg, viirs_sdr_call, viirs_sdr_options, str([keeper] + self.glist)) LOG.info("Start time: %s", start_time.strftime('%Y-%m-%d %H:%M:%S')) self.cspp_results.append( - self.pool.apply_async( - spawn_cspp, - [keeper] + self.glist, - {"viirs_sdr_call": viirs_sdr_call, - "viirs_sdr_options": viirs_sdr_options})) + self.pool.apply_async( + spawn_cspp, + [keeper] + self.glist, + {"viirs_sdr_call": viirs_sdr_call, + "viirs_sdr_options": viirs_sdr_options})) if self.fullswath: LOG.info("Full swath. Break granules loop") return False @@ -588,15 +587,15 @@ def npp_rolling_runner( granule_time_tolerance=10, ncpus=1, publisher_config=None - ): +): """The NPP/VIIRS runner. Listens and triggers processing on RDR granules.""" LOG.info("*** Start the Suomi-NPP/JPSS SDR runner:") LOG.info("THR_LUT_FILES_AGE_DAYS = " + str(thr_lut_files_age_days)) fresh = check_lut_files( - thr_lut_files_age_days, url_download_trial_frequency_hours, - lut_update_stampfile_prefix, lut_dir) + thr_lut_files_age_days, url_download_trial_frequency_hours, + lut_update_stampfile_prefix, lut_dir) if fresh: LOG.info("Files in the LUT dir are fresh...") LOG.info("...or download has been attempted recently! " + @@ -631,8 +630,8 @@ def npp_rolling_runner( viirs_proc.initialise() for msg in subscr.recv(timeout=300): status = viirs_proc.run( - msg, viirs_sdr_call, viirs_sdr_options, - granule_time_tolerance) + msg, viirs_sdr_call, viirs_sdr_options, + granule_time_tolerance) LOG.debug("Sent message to run: %s", str(msg)) LOG.debug("Status: %s", str(status)) if not status: @@ -664,8 +663,8 @@ def npp_rolling_runner( LOG.info("Now that SDR processing has completed, " + "check for new LUT files...") fresh = check_lut_files( - thr_lut_files_age_days, url_download_trial_frequency_hours, - lut_update_stampfile_prefix, lut_dir) + thr_lut_files_age_days, url_download_trial_frequency_hours, + lut_update_stampfile_prefix, lut_dir) if fresh: LOG.info("Files in the LUT dir are fresh...") LOG.info("...or download has been attempted recently! " + diff --git a/cspp_runner/tests/conftest.py b/cspp_runner/tests/conftest.py new file mode 100644 index 0000000..485550a --- /dev/null +++ b/cspp_runner/tests/conftest.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Adam.Dybbroe + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Fixtures for unittests.""" + +import pytest +from posttroll.message import Message + +TEST_YAML_CONFIG_CONTENT = """# Location to store Sensor Data Record (SDR) files after CSPP SDR processing +# is completed. +level1_home: /path/to/where/the/atms/sdr/files/will/be/stored + +working_dir: /san1/cspp/work + +# CSPP-atms batch script and parameters: +atms_sdr_call: atms_sdr.sh + +# Options to pass to the viirs_sdr_call +# see viirs_sdr.sh --help for explanation +atms_sdr_options: + - a + - d + +# Topic to use for publishing posttroll messages +publish_topics: + - /file/atms/sdr + +# Posttroll topics to listen to (comma separated) +subscribe_topics: + - /file/atms/rdr +""" + +TEST_ATMS_COLLECTION_MESSAGE = """pytroll://atms/rdr/0/gatherer collection safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T12:04:57.100000", "orbit_number": 27071, "platform_name": "NOAA-20", "sensor": "atms", "format": "RDR", "type": "HDF5", "data_processing_level": "0", "variant": "DR", "collection_area_id": "euron1", "collection": [{"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T11:54:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:54:49.200000", "end_time": "2023-02-08T11:55:21.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:21.200000", "end_time": "2023-02-08T11:55:53.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:53.200000", "end_time": "2023-02-08T11:56:25.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:25.200000", "end_time": "2023-02-08T11:56:57.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:57.200000", "end_time": "2023-02-08T11:57:29.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:57:29.200000", "end_time": "2023-02-08T11:58:01.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:01.200000", "end_time": "2023-02-08T11:58:33.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:33.200000", "end_time": "2023-02-08T11:59:05.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:05.200000", "end_time": "2023-02-08T11:59:37.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:37.200000", "end_time": "2023-02-08T12:00:09.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:09.200000", "end_time": "2023-02-08T12:00:41.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:41.200000", "end_time": "2023-02-08T12:01:13.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:13.200000", "end_time": "2023-02-08T12:01:45.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:45.200000", "end_time": "2023-02-08T12:02:17.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:17.200000", "end_time": "2023-02-08T12:02:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:49.200000", "end_time": "2023-02-08T12:03:21.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:21.100000", "end_time": "2023-02-08T12:03:53.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:53.100000", "end_time": "2023-02-08T12:04:25.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:04:25.100000", "end_time": "2023-02-08T12:04:57.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5"}]}""" + + +@pytest.fixture +def fake_cspp_workdir(tmp_path): + """Create a fake CSPP working dir.""" + wrkdir = tmp_path / 'work' + yield wrkdir + + +@pytest.fixture +def fake_atms_posttroll_message(): + """Create and return a Posttroll message for ATMS.""" + yield Message.decode(rawstr=str(TEST_ATMS_COLLECTION_MESSAGE)) + + +@pytest.fixture +def fake_yamlconfig_file(tmp_path): + """Write fake yaml config file.""" + file_path = tmp_path / 'test_atms_dr_config_config.yaml' + with open(file_path, 'w') as fpt: + fpt.write(TEST_YAML_CONFIG_CONTENT) + + yield file_path diff --git a/cspp_runner/tests/test_config.py b/cspp_runner/tests/test_config.py new file mode 100644 index 0000000..74232c0 --- /dev/null +++ b/cspp_runner/tests/test_config.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Adam.Dybbroe + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +"""Test getting the yaml configurations from file.""" + +from cspp_runner.config import read_config + + +def test_get_yaml_configuration(fake_yamlconfig_file): + """Test read and get the yaml configuration from file.""" + config = read_config(fake_yamlconfig_file) + + assert len(config['subscribe_topics']) == 1 + assert config['subscribe_topics'][0] == '/file/atms/rdr' + assert len(config['publish_topics']) == 1 + assert config['publish_topics'][0] == '/file/atms/sdr' + + assert config['level1_home'] == '/path/to/where/the/atms/sdr/files/will/be/stored' + assert config['working_dir'] == '/san1/cspp/work' + assert config['atms_sdr_call'] == 'atms_sdr.sh' + assert config['atms_sdr_options'] == ['a', 'd'] diff --git a/cspp_runner/tests/test_run_atms.py b/cspp_runner/tests/test_run_atms.py new file mode 100644 index 0000000..073619d --- /dev/null +++ b/cspp_runner/tests/test_run_atms.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Pytroll developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Testing the ATMS processing.""" + + +import pytest +import os +import logging + +from cspp_runner.atms_rdr2sdr_runner import run_atms_from_message +from cspp_runner.atms_rdr2sdr_runner import get_filelist_from_collection + +ATMS_FILENAMES = ['RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5', + 'RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5'] + + +def test_run_atms_from_message(caplog, monkeypatch, fake_cspp_workdir, fake_atms_posttroll_message): + """Test launch and run the ATMS processing from a Posttroll message.""" + monkeypatch.setenv("CSPP_WORKDIR", str(fake_cspp_workdir)) + + msg = fake_atms_posttroll_message + + sdr_call = 'atms_sdr.sh' + sdr_options = ['-d', '-a'] + + with caplog.at_level(logging.INFO): + run_atms_from_message(msg, sdr_call, sdr_options) + + breakpoint() + x = 1 + + +def test_get_filelist_from_collection(fake_atms_posttroll_message): + """Test launch and run the ATMS processing from a Posttroll message.""" + collection = fake_atms_posttroll_message.data.get('collection') + + files = get_filelist_from_collection(collection) + + assert len(files) == 20 + bnames = [os.path.basename(item) for item in files] + assert bnames == ATMS_FILENAMES diff --git a/cspp_runner/viirs_dr_runner.py b/cspp_runner/viirs_dr_runner.py index a079336..2faf132 100644 --- a/cspp_runner/viirs_dr_runner.py +++ b/cspp_runner/viirs_dr_runner.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (c) 2013 - 2021 cspp-runner developers +# Copyright (c) 2013 - 2023 Pytroll developers # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,6 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . + """Pytroll processing converting VIIRS RDR to SDR using CSPP. Level-1 processing for VIIRS Suomi NPP Direct Readout data. Using the CSPP @@ -50,8 +51,8 @@ def get_parser(): """Get parser for commandline-arguments.""" parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("-c", "--config-file", required=True, dest="config_file", @@ -112,9 +113,9 @@ def main(): ndays = int(OPTIONS.get("log_rotation_days", 1)) ncount = int(OPTIONS.get("log_rotation_backup", 7)) handler = logging.handlers.TimedRotatingFileHandler( - args.log, when='midnight', interval=ndays, - backupCount=ncount, encoding=None, - delay=False, utc=True) + args.log, when='midnight', interval=ndays, + backupCount=ncount, encoding=None, + delay=False, utc=True) handler.doRollover() else: @@ -129,26 +130,26 @@ def main(): logging.getLogger('posttroll').setLevel(logging.INFO) npp_rolling_runner( - thr_lut_files_age_days, - url_download_trial_frequency_hours, - lut_update_stampfile_prefix, - lut_dir, - url_jpss_remote_lut_dir, - OPTIONS["mirror_jpss_luts"], - url_jpss_remote_anc_dir, - anc_update_stampfile_prefix, - OPTIONS["mirror_jpss_ancillary"], - subscribe_topics, - site, - OPTIONS["mode"], - publish_topic, - OPTIONS["level1_home"], - viirs_sdr_call, - viirs_sdr_options, - int(OPTIONS.get("granule_time_tolerance", 10)), - int(OPTIONS.get("ncpus", 1)), - publisher_config=args.publisher, - ) + thr_lut_files_age_days, + url_download_trial_frequency_hours, + lut_update_stampfile_prefix, + lut_dir, + url_jpss_remote_lut_dir, + OPTIONS["mirror_jpss_luts"], + url_jpss_remote_anc_dir, + anc_update_stampfile_prefix, + OPTIONS["mirror_jpss_ancillary"], + subscribe_topics, + site, + OPTIONS["mode"], + publish_topic, + OPTIONS["level1_home"], + viirs_sdr_call, + viirs_sdr_options, + int(OPTIONS.get("granule_time_tolerance", 10)), + int(OPTIONS.get("ncpus", 1)), + publisher_config=args.publisher, + ) if __name__ == "__main__": diff --git a/examples/atms_dr_config.yaml_template b/examples/atms_dr_config.yaml_template new file mode 100644 index 0000000..e8c143a --- /dev/null +++ b/examples/atms_dr_config.yaml_template @@ -0,0 +1,22 @@ +# Location to store Sensor Data Record (SDR) files after CSPP SDR processing +# is completed. +level1_home: /path/to/where/the/atms/sdr/files/will/be/stored + +working_dir: /san1/cspp/work + +# CSPP-atms batch script and parameters: +atms_sdr_call: atms_sdr.sh + +# Options to pass to the viirs_sdr_call +# see viirs_sdr.sh --help for explanation +atms_sdr_options: + - a + - d + +# Topic to use for publishing posttroll messages +publish_topics: + - /file/atms/sdr + +# Posttroll topics to listen to (comma separated) +subscribe_topics: + - /file/atms/rdr diff --git a/examples/log_config.yaml_template b/examples/log_config.yaml_template new file mode 100644 index 0000000..c134fce --- /dev/null +++ b/examples/log_config.yaml_template @@ -0,0 +1,19 @@ +version: 1 +disable_existing_loggers: false +formatters: + pytroll: + format: '[%(asctime)s %(levelname)-8s %(name)s] %(message)s' +handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: pytroll + stream: ext://sys.stdout +loggers: + posttroll: + level: ERROR + propagate: false + handlers: [console,] +root: + level: DEBUG + handlers: [console,] \ No newline at end of file diff --git a/setup.py b/setup.py index 972052b..a096f0f 100644 --- a/setup.py +++ b/setup.py @@ -42,8 +42,8 @@ setup(name=NAME, description=DESCRIPTION, - author='Adam Dybroe', - author_email='adam.dybroe@smhi.se', + author='The Pytroll Team', + author_email='pytroll@googlegroups.com', classifiers=["Development Status :: 3 - Alpha", "Intended Audience :: Science/Research", "License :: OSI Approved :: GNU General Public License v3 " + @@ -54,10 +54,9 @@ url="https://github.com/pytroll/pytroll-cspp-runner", long_description=long_description, packages=['cspp_runner', ], + scripts=['bin/atms_dr_runner.py'], data_files=[], install_requires=['posttroll>1.7', 'trollsift'], - # test_requires=['mock'], - # test_suite='cspp_runner.tests.suite', python_requires='>=3.8', zip_safe=False, use_scm_version=True From b423729ec6edb1dfabf3f8023153004b4665cbf8 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Wed, 8 Feb 2023 20:56:34 +0100 Subject: [PATCH 02/17] Add tests to cover the subprocess call Signed-off-by: Adam.Dybbroe --- cspp_runner/tests/conftest.py | 63 ++++++++++++++++++++++++++++-- cspp_runner/tests/test_run_atms.py | 24 +++++++++--- 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/cspp_runner/tests/conftest.py b/cspp_runner/tests/conftest.py index 485550a..4b88f08 100644 --- a/cspp_runner/tests/conftest.py +++ b/cspp_runner/tests/conftest.py @@ -24,6 +24,8 @@ import pytest from posttroll.message import Message +import stat + TEST_YAML_CONFIG_CONTENT = """# Location to store Sensor Data Record (SDR) files after CSPP SDR processing # is completed. @@ -52,11 +54,10 @@ TEST_ATMS_COLLECTION_MESSAGE = """pytroll://atms/rdr/0/gatherer collection safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T12:04:57.100000", "orbit_number": 27071, "platform_name": "NOAA-20", "sensor": "atms", "format": "RDR", "type": "HDF5", "data_processing_level": "0", "variant": "DR", "collection_area_id": "euron1", "collection": [{"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T11:54:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:54:49.200000", "end_time": "2023-02-08T11:55:21.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:21.200000", "end_time": "2023-02-08T11:55:53.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:53.200000", "end_time": "2023-02-08T11:56:25.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:25.200000", "end_time": "2023-02-08T11:56:57.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:57.200000", "end_time": "2023-02-08T11:57:29.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:57:29.200000", "end_time": "2023-02-08T11:58:01.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:01.200000", "end_time": "2023-02-08T11:58:33.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:33.200000", "end_time": "2023-02-08T11:59:05.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:05.200000", "end_time": "2023-02-08T11:59:37.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:37.200000", "end_time": "2023-02-08T12:00:09.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:09.200000", "end_time": "2023-02-08T12:00:41.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:41.200000", "end_time": "2023-02-08T12:01:13.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:13.200000", "end_time": "2023-02-08T12:01:45.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:45.200000", "end_time": "2023-02-08T12:02:17.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:17.200000", "end_time": "2023-02-08T12:02:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:49.200000", "end_time": "2023-02-08T12:03:21.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:21.100000", "end_time": "2023-02-08T12:03:53.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:53.100000", "end_time": "2023-02-08T12:04:25.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:04:25.100000", "end_time": "2023-02-08T12:04:57.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5"}]}""" -@pytest.fixture -def fake_cspp_workdir(tmp_path): +@pytest.fixture(scope="session") +def fake_cspp_workdir(tmp_path_factory): """Create a fake CSPP working dir.""" - wrkdir = tmp_path / 'work' - yield wrkdir + return tmp_path_factory.mktemp("work") @pytest.fixture @@ -73,3 +74,57 @@ def fake_yamlconfig_file(tmp_path): fpt.write(TEST_YAML_CONFIG_CONTENT) yield file_path + + +# Create the fake CSPP/ATMS bash script +FAKE_ATMS_BASH_SCRIPT = """#!/bin/bash + +if [ -z "$CSPP_SDR_HOME" ]; then + echo "CSPP_SDR_HOME must be set to the path where the CSPP software was installed." + echo "export CSPP_SDR_HOME=/home/me/SDR_x_x" + exit 1 +fi + +python $CSPP_SDR_HOME/atms/fake_adl_atms_script.py -vv "$@" || echo "ATMS SDR did not complete nominally" +""" + +FAKE_ATMS_PYTHON_MAIN_SCRIPT = """ +import argparse + +if __name__ == "__main__": + desc = "Dummy/fake program to mimic launching the CSPP/ATMS rdr-to-sdr script." + parser = argparse.ArgumentParser(description=desc) + + parser.add_argument('-a', '--aggregate', + action="store_true", default=False, help="aggregate products with nagg") + parser.add_argument('-d', '--debug', action="store_true", default=False, + help='enable debug mode on ADL and avoid cleaning workspace') + parser.add_argument('-v', '--verbosity', action="count", default=0, + help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') + parser.add_argument('filenames', metavar='filename', type=str, nargs='+', + help='HDF5 ATMS RDR file/s to process') + + args = parser.parse_args() + + print(args.filenames) +""" + + +@pytest.fixture(scope="session") +def fake_adl_atms_scripts(tmp_path_factory): + """Create a fake cspp/atms bash and python main script.""" + cspp_home_dir = tmp_path_factory.mktemp('CSPP') + path = cspp_home_dir / 'atms' + path.mkdir() + filename = path / 'atms_sdr.sh' + with open(filename, 'w') as fpt: + fpt.write(FAKE_ATMS_BASH_SCRIPT) + + # Make executable: + filename.chmod(stat.S_IRWXU) + + filename = cspp_home_dir / 'atms' / 'fake_adl_atms_script.py' + with open(filename, 'w') as fpt: + fpt.write(FAKE_ATMS_PYTHON_MAIN_SCRIPT) + + yield cspp_home_dir diff --git a/cspp_runner/tests/test_run_atms.py b/cspp_runner/tests/test_run_atms.py index 073619d..0fe8680 100644 --- a/cspp_runner/tests/test_run_atms.py +++ b/cspp_runner/tests/test_run_atms.py @@ -52,20 +52,32 @@ 'RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5'] -def test_run_atms_from_message(caplog, monkeypatch, fake_cspp_workdir, fake_atms_posttroll_message): +def test_run_atms_from_message(caplog, monkeypatch, fake_cspp_workdir, + fake_atms_posttroll_message, fake_adl_atms_scripts): """Test launch and run the ATMS processing from a Posttroll message.""" monkeypatch.setenv("CSPP_WORKDIR", str(fake_cspp_workdir)) - msg = fake_atms_posttroll_message + cspp_homedir = fake_adl_atms_scripts + monkeypatch.setenv("CSPP_SDR_HOME", str(cspp_homedir)) + + mypath = cspp_homedir / 'atms' + path_env = os.environ.get('PATH') + monkeypatch.setenv("PATH", path_env + ":" + str(mypath)) sdr_call = 'atms_sdr.sh' sdr_options = ['-d', '-a'] - with caplog.at_level(logging.INFO): - run_atms_from_message(msg, sdr_call, sdr_options) + run_atms_from_message(fake_atms_posttroll_message, sdr_call, sdr_options) + + res = caplog.text.strip().split('\n') + assert len(res) == 4 + + for atmsfile in ATMS_FILENAMES: + assert atmsfile in res[0] + assert atmsfile in res[1] - breakpoint() - x = 1 + assert "Seconds process time:" in res[2] + assert "Seconds wall clock time:" in res[3] def test_get_filelist_from_collection(fake_atms_posttroll_message): From 6587ccdceb037e0c59cb420a3b725328624b71bd Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Thu, 9 Feb 2023 10:57:42 +0100 Subject: [PATCH 03/17] Add verbosity option Signed-off-by: Adam.Dybbroe --- bin/atms_dr_runner.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bin/atms_dr_runner.py b/bin/atms_dr_runner.py index 7ad4a85..686d1fb 100644 --- a/bin/atms_dr_runner.py +++ b/bin/atms_dr_runner.py @@ -65,6 +65,10 @@ def get_parser(): help="The file containing configuration parameters.") parser.add_argument("-l", "--log-config", help="Log config file to use instead of the standard logging.") + parser.add_argument("-v", "--verbose", dest="verbosity", action="count", default=0, + help="Verbosity (between 1 and 2 occurrences with more leading to more " + "verbose logging). WARN=0, INFO=1, " + "DEBUG=2. This is overridden by the log config file if specified.") return parser From 43a0f58fb4a9737212aad80919899eea072e89c5 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Thu, 9 Feb 2023 11:21:19 +0100 Subject: [PATCH 04/17] Remove pdb from code Signed-off-by: Adam.Dybbroe --- bin/atms_dr_runner.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/bin/atms_dr_runner.py b/bin/atms_dr_runner.py index 686d1fb..86d82cb 100644 --- a/bin/atms_dr_runner.py +++ b/bin/atms_dr_runner.py @@ -22,7 +22,6 @@ """Level-1 processing for Direct Readout S-NPP/JPSS ATMS data. - Using the CSPP level-1 processor from the SSEC, Wisconsin, based on the ADL software from NASA. Listen for pytroll messages of ready RDR files trigger processing on direct readout RDR data (granules or full swaths). @@ -31,15 +30,11 @@ import argparse import logging -import ast -import configparser import os import sys -import logging import logging.handlers from cspp_runner.atms_rdr2sdr_runner import AtmsSdrRunner -from cspp_runner.config import read_config from cspp_runner.logger import setup_logging CSPP_SDR_HOME = os.environ.get("CSPP_SDR_HOME", '') @@ -85,13 +80,6 @@ def main(): setup_logging(cmd_args) - #OPTIONS = read_config(cmd_args.config_file) - #publish_topics = OPTIONS.get('publish_topics') - #subscribe_topics = OPTIONS.get('subscribe_topics') - #viirs_sdr_options = OPTIONS.get("atms_sdr_options") - - breakpoint() - try: atms = AtmsSdrRunner(cmd_args.config_file) except Exception as err: From 9873a175efdce8e5db9ecfb9dc10db9fc078717d Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Thu, 9 Feb 2023 12:58:30 +0100 Subject: [PATCH 05/17] Fix call options for cspp/atms script Signed-off-by: Adam.Dybbroe --- cspp_runner/tests/conftest.py | 9 ++++++--- cspp_runner/tests/test_config.py | 2 +- examples/atms_dr_config.yaml_template | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cspp_runner/tests/conftest.py b/cspp_runner/tests/conftest.py index 4b88f08..a2b0583 100644 --- a/cspp_runner/tests/conftest.py +++ b/cspp_runner/tests/conftest.py @@ -39,8 +39,8 @@ # Options to pass to the viirs_sdr_call # see viirs_sdr.sh --help for explanation atms_sdr_options: - - a - - d + - '-a' + - '-d' # Topic to use for publishing posttroll messages publish_topics: @@ -51,7 +51,10 @@ - /file/atms/rdr """ -TEST_ATMS_COLLECTION_MESSAGE = """pytroll://atms/rdr/0/gatherer collection safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T12:04:57.100000", "orbit_number": 27071, "platform_name": "NOAA-20", "sensor": "atms", "format": "RDR", "type": "HDF5", "data_processing_level": "0", "variant": "DR", "collection_area_id": "euron1", "collection": [{"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T11:54:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:54:49.200000", "end_time": "2023-02-08T11:55:21.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:21.200000", "end_time": "2023-02-08T11:55:53.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:53.200000", "end_time": "2023-02-08T11:56:25.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:25.200000", "end_time": "2023-02-08T11:56:57.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:57.200000", "end_time": "2023-02-08T11:57:29.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:57:29.200000", "end_time": "2023-02-08T11:58:01.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:01.200000", "end_time": "2023-02-08T11:58:33.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:33.200000", "end_time": "2023-02-08T11:59:05.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:05.200000", "end_time": "2023-02-08T11:59:37.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:37.200000", "end_time": "2023-02-08T12:00:09.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:09.200000", "end_time": "2023-02-08T12:00:41.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:41.200000", "end_time": "2023-02-08T12:01:13.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:13.200000", "end_time": "2023-02-08T12:01:45.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:45.200000", "end_time": "2023-02-08T12:02:17.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:17.200000", "end_time": "2023-02-08T12:02:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:49.200000", "end_time": "2023-02-08T12:03:21.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:21.100000", "end_time": "2023-02-08T12:03:53.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:53.100000", "end_time": "2023-02-08T12:04:25.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:04:25.100000", "end_time": "2023-02-08T12:04:57.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5"}]}""" +TEST_ATMS_COLLECTION_MESSAGE = """pytroll://atms/rdr/0/gatherer collection safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T12:04:57.100000", "orbit_number": 27071, "platform_name": "NOAA-20", "sensor": "atms", "format": "RDR", "type": "HDF5", "data_processing_level": "0", "variant": "DR", "collection_area_id": "euron1", "collection": [{"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T11:54:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:54:49.200000", "end_time": "2023-02-08T11:55:21.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:21.200000", "end_time": "2023-02-08T11:55:53.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:53.200000", "end_time": "2023-02-08T11:56:25.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:25.200000", "end_time": "2023-02-08T11:56:57.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:57.200000", "end_time": "2023-02-08T11:57:29.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:57:29.200000", "end_time": "2023-02-08T11:58:01.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:01.200000", "end_time": "2023-02-08T11:58:33.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:33.200000", "end_time": "2023-02-08T11:59:05.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:05.200000", "end_time": "2023-02-08T11:59:37.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:37.200000", "end_time": "2023-02-08T12:00:09.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:09.200000", "end_time": "2023-02-08T12:00:41.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:41.200000", "end_time": "2023-02-08T12:01:13.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:13.200000", "end_time": "2023-02-08T12:01:45.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:45.200000", "end_time": "2023-02-08T12:02:17.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:17.200000", "end_time": "2023-02-08T12:02:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:49.200000", "end_time": "2023-02-08T12:03:21.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:21.100000", "end_time": "2023-02-08T12:03:53.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:53.100000", "end_time": "2023-02-08T12:04:25.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:04:25.100000", "end_time": "2023-02-08T12:04:57.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5"}]}""" # noqa + + +TEST_ATMS_FILE_MESSAGE = """pytroll://atms/rdr/0/file file safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {'start_time': datetime.datetime(2023, 2, 9, 9, 6, 10), 'end_time': datetime.datetime(2023, 2, 9, 9, 6, 42), 'orbit_number': 58481, 'platform_name': 'Suomi-NPP', 'sensor': 'atms', 'format': 'RDR', 'type': 'HDF5', 'data_processing_level': '0', 'uid': 'RATMS-RNSCA_npp_d20230209_t0906100_e0906420_b00001_c20230209090700529000_drlu_ops.h5', 'uri': 'ssh://172.29.1.52/san1/polar_in/direct_readout/npp/lvl0/RATMS-RNSCA_npp_d20230209_t0906100_e0906420_b00001_c20230209090700529000_drlu_ops.h5', 'variant': 'DR'}""" # noqa @pytest.fixture(scope="session") diff --git a/cspp_runner/tests/test_config.py b/cspp_runner/tests/test_config.py index 74232c0..fc150d2 100644 --- a/cspp_runner/tests/test_config.py +++ b/cspp_runner/tests/test_config.py @@ -38,4 +38,4 @@ def test_get_yaml_configuration(fake_yamlconfig_file): assert config['level1_home'] == '/path/to/where/the/atms/sdr/files/will/be/stored' assert config['working_dir'] == '/san1/cspp/work' assert config['atms_sdr_call'] == 'atms_sdr.sh' - assert config['atms_sdr_options'] == ['a', 'd'] + assert config['atms_sdr_options'] == ['-a', '-d'] diff --git a/examples/atms_dr_config.yaml_template b/examples/atms_dr_config.yaml_template index e8c143a..d71c714 100644 --- a/examples/atms_dr_config.yaml_template +++ b/examples/atms_dr_config.yaml_template @@ -10,8 +10,8 @@ atms_sdr_call: atms_sdr.sh # Options to pass to the viirs_sdr_call # see viirs_sdr.sh --help for explanation atms_sdr_options: - - a - - d + - '-a' + - '-d' # Topic to use for publishing posttroll messages publish_topics: From d41746acfb33a67440187c6d14c316c45a288ae0 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Fri, 10 Feb 2023 12:12:11 +0100 Subject: [PATCH 06/17] create and send output messages when ATMS processing is ready Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 155 ++++++++++++++++++++++---- cspp_runner/constants.py | 31 ++++++ cspp_runner/tests/conftest.py | 56 +++++++++- cspp_runner/tests/test_config.py | 2 + cspp_runner/tests/test_run_atms.py | 69 +++++++++++- examples/atms_dr_config.yaml_template | 11 ++ 6 files changed, 302 insertions(+), 22 deletions(-) create mode 100644 cspp_runner/constants.py diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index a112f42..0122bd4 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -20,12 +20,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -""" -""" +"""Run the ATMS RDR to SDR processing withy CSPP on incoming DR data.""" + + import socket from trollsift import Parser, globify import os +import shutil + import pathlib +import tempfile +from glob import glob +from datetime import datetime + import subprocess from urllib.parse import urlparse import logging @@ -39,6 +46,9 @@ from posttroll.publisher import NoisyPublisher from cspp_runner.config import read_config from cspp_runner.runner import check_environment +from cspp_runner.constants import (PLATFORM_SHORTNAMES, + PLATFORM_LONGNAMES) + logger = logging.getLogger(__name__) @@ -55,6 +65,11 @@ def __init__(self, configfile): config = read_config(self.configfile) self.options = config + self.host = socket.gethostname() + + self.sdr_file_patterns = self.options['sdr_file_patterns'] + self._sdr_home = self.options['level1_home'] + self.input_topics = self.options['subscribe_topics'] self.output_topics = self.options['publish_topics'] @@ -95,31 +110,125 @@ def run(self): logger.debug("Message type not supported: %s", str(msg.type)) continue - #platform_name = msg.data.get('platform_name') - wrkdir = run_atms_from_message(msg, self._atms_sdr_call, self._atms_sdr_options) - logger.warning("Do nothing...") + logger.info("ATMS RDR to SDR processing finished") + logger.debug("Start packing the files and publish") - # filename = get_filename_from_uri(msg.data.get('uri')) - # if not os.path.exists(filename): - # logger.warning("File does not exist!") - # continue + sdr_filepaths = get_filepaths(wrkdir, msg.data, self.sdr_file_patterns) - # file_ok = check_file_type_okay(msg.data.get('type')) + dest_sdr_files = move_files_to_destination(sdr_filepaths, self.sdr_file_patterns, self._sdr_home) - # for output_msg in output_messages: - # if output_msg: - # logger.debug("Sending message: %s", str(output_msg)) - # self.publisher.send(str(output_msg)) + output_messages = self._get_output_messages(dest_sdr_files, msg) + for output_msg in output_messages: + if output_msg: + logger.debug("Sending message: %s", str(output_msg)) + self.publisher.send(str(output_msg)) + def _get_output_messages(self, sdr_files, input_msg): + """Generate output messages from SDR files and input message, and return.""" + out_messages = [] + for topic in self.output_topics: + to_send = prepare_posttroll_message(input_msg) + dataset = [] + for filepath in sdr_files: + sdrfile = {} + sdrfile['uri'] = 'ssh://{host}/{path}'.format(host=self.host, path=filepath) + sdrfile['uid'] = os.path.basename(filepath) + dataset.append(sdrfile) -def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): - """Trigger ATMS processing on ATMS scene, from Posttroll message.""" + to_send['type'] = 'HDF5' + to_send['format'] = 'SDR' + to_send['data_processing_level'] = '1B' + to_send['dataset'] = dataset + + pubmsg = Message(topic, 'dataset', to_send) + out_messages.append(pubmsg) + + return out_messages + + +def prepare_posttroll_message(input_msg): + """Create the basic posttroll-message fields and return.""" + to_send = input_msg.data.copy() + to_send.pop('dataset', None) + to_send.pop('collection', None) + to_send.pop('uri', None) + to_send.pop('uid', None) + to_send.pop('format', None) + to_send.pop('type', None) + + return to_send + + +def move_files_to_destination(sdr_filepaths, sdr_file_patterns, sdr_home): + """Move the SDR files from tmp-directory to a final destination.""" + dirpath = create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home) + for filename in sdr_filepaths: + shutil.move(filename, dirpath) + + return glob(str(dirpath / "*")) - platform_name = posttroll_msg.data.get('platform_name') - sensor = posttroll_msg.data.get('sensor') +def create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home): + """From the list of SDR files create a sub-directory where files should be moved.""" + s_pattern = None + for pattern in sdr_file_patterns: + if pattern.startswith('S'): + s_pattern = pattern + break + + start_time = datetime.now() + p__ = Parser(s_pattern) + + orbit = None + platform = None + for filename in sdr_filepaths: + bname = os.path.basename(str(filename)) + try: + result = p__.parse(bname) + except ValueError: + continue + + stime = result['start_time'] + if stime < start_time: + start_time = stime + orbit = result['orbit'] + platform = PLATFORM_LONGNAMES.get(result['platform_shortname']) + + subdirname = "{platform}_{dtime:%Y%m%d_%H%M}_{orbit:05d}".format(platform=platform.lower().replace('-', ''), + dtime=start_time, orbit=orbit) + if isinstance(sdr_home, str): + sdr_home = pathlib.Path(sdr_home) + + dirpath = sdr_home / subdirname + dirpath.mkdir() + return dirpath + + +def get_filepaths(directory, msg_data, file_patterns): + """Identify the ATMS files output from CSPP and return filepaths.""" + files = [] + for pattern in file_patterns: + # p__ = Parser(pattern) + mda = {'orbit': msg_data['orbit_number'], + 'platform_shortname': PLATFORM_SHORTNAMES.get(msg_data['platform_name'])} + + # 'start_time': msg_data['start_time']} Here check for times, if + # start/end times in the file names are sufficiently close to the + # actual ones from the messages + + glbstr = globify(pattern, mda) + flist = glob(os.path.join(directory, glbstr)) + files = files + flist + + return files + + +def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): + """Trigger ATMS processing on ATMS scene, from Posttroll message.""" + # platform_name = posttroll_msg.data.get('platform_name') + # sensor = posttroll_msg.data.get('sensor') collection = posttroll_msg.data.get('collection') if collection: atms_rdr_files = get_filelist_from_collection(collection) @@ -132,6 +241,13 @@ def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): cspp_workdir = os.environ.get("CSPP_WORKDIR", '') pathlib.Path(cspp_workdir).mkdir(parents=True, exist_ok=True) + try: + working_dir = tempfile.mkdtemp(dir=cspp_workdir) + except OSError: + working_dir = tempfile.mkdtemp() + + os.environ["CSPP_WORKDIR"] = working_dir + my_env = os.environ.copy() # Run the command: @@ -143,7 +259,8 @@ def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): t0_wall = time.time() logger.info("Popen call arguments: " + str(cmdlist)) - sdr_proc = subprocess.Popen(cmdlist, cwd=cspp_workdir, + sdr_proc = subprocess.Popen(cmdlist, cwd=working_dir, + env=my_env, stderr=subprocess.PIPE, stdout=subprocess.PIPE) while True: diff --git a/cspp_runner/constants.py b/cspp_runner/constants.py new file mode 100644 index 0000000..f1388e6 --- /dev/null +++ b/cspp_runner/constants.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Adam.Dybbroe + +# Author(s): + +# Adam.Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Constants and utilities for name conversions etc.""" + + +PLATFORM_SHORTNAMES = {'NOAA-20': 'j01', + 'NOAA-21': 'j02', + 'Suomi-NPP': 'npp'} +PLATFORM_LONGNAMES = {'j01': 'NOAA-20', + 'j02': 'NOAA-21', + 'npp': 'Suomi-NPP'} diff --git a/cspp_runner/tests/conftest.py b/cspp_runner/tests/conftest.py index a2b0583..85da853 100644 --- a/cspp_runner/tests/conftest.py +++ b/cspp_runner/tests/conftest.py @@ -31,6 +31,17 @@ # is completed. level1_home: /path/to/where/the/atms/sdr/files/will/be/stored +# Examples: +# TATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123846465182_cspp_dev.h5 +# SATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123844714309_cspp_dev.h5 +# GATMO_npp_d20230209_t1047306_e1236183_b58482_c20230209123847932256_cspp_dev.h5 + +sdr_file_patterns: + - 'SATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'GATMO_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'TATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + + working_dir: /san1/cspp/work # CSPP-atms batch script and parameters: @@ -49,7 +60,7 @@ # Posttroll topics to listen to (comma separated) subscribe_topics: - /file/atms/rdr -""" +""" # noqa TEST_ATMS_COLLECTION_MESSAGE = """pytroll://atms/rdr/0/gatherer collection safusr.u@lxserv1043.smhi.se 2023-02-08T12:06:01.560943 v1.01 application/json {"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T12:04:57.100000", "orbit_number": 27071, "platform_name": "NOAA-20", "sensor": "atms", "format": "RDR", "type": "HDF5", "data_processing_level": "0", "variant": "DR", "collection_area_id": "euron1", "collection": [{"start_time": "2023-02-08T11:54:17.200000", "end_time": "2023-02-08T11:54:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:54:49.200000", "end_time": "2023-02-08T11:55:21.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:21.200000", "end_time": "2023-02-08T11:55:53.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155212_e1155532_b00001_c20230208115558220000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:55:53.200000", "end_time": "2023-02-08T11:56:25.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1155532_e1156252_b00001_c20230208115638170000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:25.200000", "end_time": "2023-02-08T11:56:57.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156252_e1156572_b00001_c20230208115718069000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:56:57.200000", "end_time": "2023-02-08T11:57:29.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1156572_e1157292_b00001_c20230208115738216000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:57:29.200000", "end_time": "2023-02-08T11:58:01.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1157292_e1158012_b00001_c20230208115818194000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:01.200000", "end_time": "2023-02-08T11:58:33.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158012_e1158332_b00001_c20230208115837985000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:58:33.200000", "end_time": "2023-02-08T11:59:05.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1158332_e1159052_b00001_c20230208115918341000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:05.200000", "end_time": "2023-02-08T11:59:37.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159052_e1159372_b00001_c20230208115957983000_drlu_ops.h5"}, {"start_time": "2023-02-08T11:59:37.200000", "end_time": "2023-02-08T12:00:09.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1159372_e1200092_b00001_c20230208120017590000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:09.200000", "end_time": "2023-02-08T12:00:41.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200092_e1200412_b00001_c20230208120058642000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:00:41.200000", "end_time": "2023-02-08T12:01:13.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1200412_e1201132_b00001_c20230208120118131000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:13.200000", "end_time": "2023-02-08T12:01:45.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201132_e1201452_b00001_c20230208120157708000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:01:45.200000", "end_time": "2023-02-08T12:02:17.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1201452_e1202172_b00001_c20230208120238505000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:17.200000", "end_time": "2023-02-08T12:02:49.200000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202172_e1202492_b00001_c20230208120258137000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:02:49.200000", "end_time": "2023-02-08T12:03:21.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1202492_e1203211_b00001_c20230208120337761000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:21.100000", "end_time": "2023-02-08T12:03:53.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203211_e1203531_b00001_c20230208120358165000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:03:53.100000", "end_time": "2023-02-08T12:04:25.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1203531_e1204251_b00001_c20230208120437760000_drlu_ops.h5"}, {"start_time": "2023-02-08T12:04:25.100000", "end_time": "2023-02-08T12:04:57.100000", "uri": "ssh://172.29.1.52/path/to/jpss/atms/rdr/RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5", "uid": "RATMS-RNSCA_j01_d20230208_t1204251_e1204571_b00001_c20230208120518174000_drlu_ops.h5"}]}""" # noqa @@ -131,3 +142,46 @@ def fake_adl_atms_scripts(tmp_path_factory): fpt.write(FAKE_ATMS_PYTHON_MAIN_SCRIPT) yield cspp_home_dir + + +SINGLE_ATMS_FILESET = [ + "TATMS_j01_d20230209_t1317546_e1325223_b27088_c20230209132641834076_cspp_dev.h5", + "SATMS_j01_d20230209_t1317546_e1325223_b27088_c20230209132641621751_cspp_dev.h5", + "GATMO_j01_d20230209_t1317546_e1325223_b27088_c20230209132642207625_cspp_dev.h5" +] + + +FAKE_ATMS_SDR_FILENAMES = [ + "TATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123846465182_cspp_dev.h5", + "SATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123844714309_cspp_dev.h5", + "GATMO_npp_d20230209_t1047306_e1236183_b58482_c20230209123847932256_cspp_dev.h5", +] +FAKE_ATMS_SDR_FILENAMES = FAKE_ATMS_SDR_FILENAMES + SINGLE_ATMS_FILESET + + +@pytest.fixture +def fake_atms_sdr_files_several_passes(tmp_path): + """Make fake ATMS SDR files.""" + for fname in FAKE_ATMS_SDR_FILENAMES: + filepath = tmp_path / fname + filepath.touch() + + yield tmp_path + + +@pytest.fixture +def fake_atms_sdr_files_one_pass(tmp_path_factory): + """Make fake ATMS SDR files.""" + dirname = tmp_path_factory.mktemp('data') + for fname in SINGLE_ATMS_FILESET: + filepath = dirname / fname + filepath.touch() + + yield dirname + + +@pytest.fixture +def fake_sdr_homedir(tmp_path_factory): + """Make a fake home directory for the SDR files.""" + dirname = tmp_path_factory.mktemp('sdr_home') + yield dirname diff --git a/cspp_runner/tests/test_config.py b/cspp_runner/tests/test_config.py index fc150d2..16108c9 100644 --- a/cspp_runner/tests/test_config.py +++ b/cspp_runner/tests/test_config.py @@ -39,3 +39,5 @@ def test_get_yaml_configuration(fake_yamlconfig_file): assert config['working_dir'] == '/san1/cspp/work' assert config['atms_sdr_call'] == 'atms_sdr.sh' assert config['atms_sdr_options'] == ['-a', '-d'] + + assert len(config['sdr_file_patterns']) == 3 diff --git a/cspp_runner/tests/test_run_atms.py b/cspp_runner/tests/test_run_atms.py index 0fe8680..ddf1ddb 100644 --- a/cspp_runner/tests/test_run_atms.py +++ b/cspp_runner/tests/test_run_atms.py @@ -22,13 +22,20 @@ """Testing the ATMS processing.""" - -import pytest import os import logging +from glob import glob +from datetime import datetime + +from trollsift import Parser + +from cspp_runner.config import read_config +from cspp_runner.atms_rdr2sdr_runner import get_filepaths from cspp_runner.atms_rdr2sdr_runner import run_atms_from_message from cspp_runner.atms_rdr2sdr_runner import get_filelist_from_collection +from cspp_runner.atms_rdr2sdr_runner import move_files_to_destination + ATMS_FILENAMES = ['RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5', 'RATMS-RNSCA_j01_d20230208_t1154492_e1155212_b00001_c20230208115538023000_drlu_ops.h5', @@ -89,3 +96,61 @@ def test_get_filelist_from_collection(fake_atms_posttroll_message): assert len(files) == 20 bnames = [os.path.basename(item) for item in files] assert bnames == ATMS_FILENAMES + + +def test_get_filepaths(fake_yamlconfig_file, fake_atms_sdr_files_several_passes): + """Test get the filepaths of the ATMS SDR files produced from CSPP.""" + config = read_config(fake_yamlconfig_file) + + patterns = config['sdr_file_patterns'] + fake_message_data = {} + fake_message_data['orbit_number'] = 27088 + fake_message_data['platform_name'] = 'NOAA-20' + fake_message_data['start_time'] = datetime.strptime("2023-02-09T13:17:20.600000", "%Y-%m-%dT%H:%M:%S.%f") + fake_message_data['end_time'] = datetime.strptime("2023-02-09T13:25:52.600000", "%Y-%m-%dT%H:%M:%S.%f") + files = get_filepaths(str(fake_atms_sdr_files_several_passes), fake_message_data, patterns) + + assert len(files) == 3 + p__ = Parser(patterns[0]) + result = p__.parse(os.path.basename(files[0])) + assert result['platform_shortname'] == 'j01' + assert result['orbit'] == 27088 + assert result['start_time'] == datetime(2023, 2, 9, 13, 17, 54, 600000) + + +def test_move_files_to_destination_dir_is_str(fake_yamlconfig_file, fake_sdr_homedir, fake_atms_sdr_files_one_pass): + """Test move the ATMS SDR files to a destination dir.""" + config = read_config(fake_yamlconfig_file) + patterns = config['sdr_file_patterns'] + + sdr_file_paths = glob(str(fake_atms_sdr_files_one_pass / '*h5')) + expected = [os.path.basename(f) for f in sdr_file_paths] + expected.sort() + + filelist = move_files_to_destination(sdr_file_paths, patterns, str(fake_sdr_homedir)) + + assert len(filelist) == 3 + assert os.path.basename(os.path.normpath(os.path.dirname(filelist[0]))) == "noaa20_20230209_1317_27088" + bnames = [os.path.basename(f) for f in filelist] + bnames.sort() + + assert bnames == expected + + +def test_move_files_to_destination_pathlib(fake_yamlconfig_file, fake_sdr_homedir, fake_atms_sdr_files_one_pass): + """Test move the ATMS SDR files to a destination dir.""" + config = read_config(fake_yamlconfig_file) + patterns = config['sdr_file_patterns'] + + sdr_file_paths = glob(str(fake_atms_sdr_files_one_pass / '*h5')) + expected = [os.path.basename(f) for f in sdr_file_paths] + expected.sort() + + filelist = move_files_to_destination(sdr_file_paths, patterns, fake_sdr_homedir) + + assert len(filelist) == 3 + assert os.path.basename(os.path.normpath(os.path.dirname(filelist[0]))) == "noaa20_20230209_1317_27088" + bnames = [os.path.basename(f) for f in filelist] + bnames.sort() + + assert bnames == expected diff --git a/examples/atms_dr_config.yaml_template b/examples/atms_dr_config.yaml_template index d71c714..9ade552 100644 --- a/examples/atms_dr_config.yaml_template +++ b/examples/atms_dr_config.yaml_template @@ -2,6 +2,17 @@ # is completed. level1_home: /path/to/where/the/atms/sdr/files/will/be/stored +# Examples: +# TATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123846465182_cspp_dev.h5 +# SATMS_npp_d20230209_t1047306_e1236183_b58482_c20230209123844714309_cspp_dev.h5 +# GATMO_npp_d20230209_t1047306_e1236183_b58482_c20230209123847932256_cspp_dev.h5 + +sdr_file_patterns: + - 'SATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'GATMO_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + - 'TATMS_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_time:%H%M%S%f}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' + + working_dir: /san1/cspp/work # CSPP-atms batch script and parameters: From fb12b2a6c4c4c94dc0b1e5e0584734082892efbb Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Fri, 10 Feb 2023 12:28:03 +0100 Subject: [PATCH 07/17] Add missing stop method Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index 0122bd4..4b698e8 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -147,6 +147,20 @@ def _get_output_messages(self, sdr_files, input_msg): return out_messages + def close(self): + """Shutdown the ATMS SDR processing.""" + logger.info('Terminating ATMS RDR to SDR processing.') + self.loop = False + try: + self.listener.stop() + except Exception: + logger.exception("Couldn't stop listener.") + if self.publisher: + try: + self.publisher.stop() + except Exception: + logger.exception("Couldn't stop publisher.") + def prepare_posttroll_message(input_msg): """Create the basic posttroll-message fields and return.""" From 17c4a48b1663d8aca0a00a66e14143fa4e6b3dd4 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Fri, 10 Feb 2023 14:28:38 +0100 Subject: [PATCH 08/17] Add more debug info Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index 4b698e8..58edf13 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -116,8 +116,8 @@ def run(self): logger.debug("Start packing the files and publish") sdr_filepaths = get_filepaths(wrkdir, msg.data, self.sdr_file_patterns) - - dest_sdr_files = move_files_to_destination(sdr_filepaths, self.sdr_file_patterns, self._sdr_home) + dest_sdr_files = move_files_to_destination(sdr_filepaths, + self.sdr_file_patterns, self._sdr_home) output_messages = self._get_output_messages(dest_sdr_files, msg) for output_msg in output_messages: @@ -195,19 +195,20 @@ def create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home): start_time = datetime.now() p__ = Parser(s_pattern) - orbit = None - platform = None + orbit = 0 + platform = 'unknown' for filename in sdr_filepaths: bname = os.path.basename(str(filename)) + logger.debug("SDR filename: %s", str(bname)) try: result = p__.parse(bname) except ValueError: continue - stime = result['start_time'] - if stime < start_time: + stime = result.get('start_time') + if stime and stime < start_time: start_time = stime - orbit = result['orbit'] + orbit = result.get('orbit', 0) platform = PLATFORM_LONGNAMES.get(result['platform_shortname']) subdirname = "{platform}_{dtime:%Y%m%d_%H%M}_{orbit:05d}".format(platform=platform.lower().replace('-', ''), @@ -233,9 +234,11 @@ def get_filepaths(directory, msg_data, file_patterns): # actual ones from the messages glbstr = globify(pattern, mda) + logger.debug("Glob-string = %s", str(glbstr)) flist = glob(os.path.join(directory, glbstr)) files = files + flist + logger.debug("Files: %s", str(files)) return files From deb3a032988e7370930a262120ed1e4388e519c9 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Fri, 10 Feb 2023 15:25:06 +0100 Subject: [PATCH 09/17] Fix correct orbit number in output messages Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 48 ++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index 58edf13..7c24032 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -119,13 +119,37 @@ def run(self): dest_sdr_files = move_files_to_destination(sdr_filepaths, self.sdr_file_patterns, self._sdr_home) - output_messages = self._get_output_messages(dest_sdr_files, msg) + orbit_number = self._fix_orbit_number(dest_sdr_files, self.sdr_file_patterns) + output_messages = self._get_output_messages(dest_sdr_files, msg, orbit_number) + for output_msg in output_messages: if output_msg: logger.debug("Sending message: %s", str(output_msg)) self.publisher.send(str(output_msg)) - def _get_output_messages(self, sdr_files, input_msg): + def _fix_orbit_number(self, sdr_files, sdr_file_patterns): + """Get the orbit number from the SDR files produced with CSPP.""" + s_pattern = get_tb_files_pattern(sdr_file_patterns) + + p__ = Parser(s_pattern) + orbit_numbers = [] + for filename in sdr_files: + bname = os.path.basename(str(filename)) + logger.debug("SDR filename: %s", str(bname)) + try: + result = p__.parse(bname) + except ValueError: + continue + + orbit = result.get('orbit', 0) + logger.debug("Orbit number = %s", orbit) + orbit_numbers.append(orbit) + + # Test if there are more orbit numbers and at least log an info. + # FIXME! + return orbit_numbers[0] + + def _get_output_messages(self, sdr_files, input_msg, orbit_number): """Generate output messages from SDR files and input message, and return.""" out_messages = [] for topic in self.output_topics: @@ -141,6 +165,8 @@ def _get_output_messages(self, sdr_files, input_msg): to_send['format'] = 'SDR' to_send['data_processing_level'] = '1B' to_send['dataset'] = dataset + to_send['orig_orbit_number'] = to_send.get('orbit_number') + to_send['orbit_number'] = orbit_number pubmsg = Message(topic, 'dataset', to_send) out_messages.append(pubmsg) @@ -184,13 +210,16 @@ def move_files_to_destination(sdr_filepaths, sdr_file_patterns, sdr_home): return glob(str(dirpath / "*")) -def create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home): - """From the list of SDR files create a sub-directory where files should be moved.""" - s_pattern = None +def get_tb_files_pattern(sdr_file_patterns): + """Get the file name pattern for the TB SDR files (SATMS).""" for pattern in sdr_file_patterns: if pattern.startswith('S'): - s_pattern = pattern - break + return pattern + + +def create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home): + """From the list of SDR files create a sub-directory where files should be moved.""" + s_pattern = get_tb_files_pattern(sdr_file_patterns) start_time = datetime.now() p__ = Parser(s_pattern) @@ -226,8 +255,9 @@ def get_filepaths(directory, msg_data, file_patterns): files = [] for pattern in file_patterns: # p__ = Parser(pattern) - mda = {'orbit': msg_data['orbit_number'], - 'platform_shortname': PLATFORM_SHORTNAMES.get(msg_data['platform_name'])} + # mda = {'orbit': msg_data['orbit_number'], + # 'platform_shortname': PLATFORM_SHORTNAMES.get(msg_data['platform_name'])} + mda = {'platform_shortname': PLATFORM_SHORTNAMES.get(msg_data['platform_name'])} # 'start_time': msg_data['start_time']} Here check for times, if # start/end times in the file names are sufficiently close to the From 37a2d292a2c1425048a5e9455654ffb53a0af8fb Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Fri, 10 Feb 2023 15:58:44 +0100 Subject: [PATCH 10/17] Bugfix, and refactor orbit number estimation Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 52 ++++++++++++++++-------------- cspp_runner/tests/test_run_atms.py | 15 ++++++++- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index 7c24032..5886acd 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -116,10 +116,13 @@ def run(self): logger.debug("Start packing the files and publish") sdr_filepaths = get_filepaths(wrkdir, msg.data, self.sdr_file_patterns) + logger.debug("Files: %s", str(sdr_filepaths)) + dest_sdr_files = move_files_to_destination(sdr_filepaths, self.sdr_file_patterns, self._sdr_home) + logger.debug("Files after having been moved: %s", str(dest_sdr_files)) - orbit_number = self._fix_orbit_number(dest_sdr_files, self.sdr_file_patterns) + orbit_number = _fix_orbit_number(dest_sdr_files, self.sdr_file_patterns) output_messages = self._get_output_messages(dest_sdr_files, msg, orbit_number) for output_msg in output_messages: @@ -127,28 +130,6 @@ def run(self): logger.debug("Sending message: %s", str(output_msg)) self.publisher.send(str(output_msg)) - def _fix_orbit_number(self, sdr_files, sdr_file_patterns): - """Get the orbit number from the SDR files produced with CSPP.""" - s_pattern = get_tb_files_pattern(sdr_file_patterns) - - p__ = Parser(s_pattern) - orbit_numbers = [] - for filename in sdr_files: - bname = os.path.basename(str(filename)) - logger.debug("SDR filename: %s", str(bname)) - try: - result = p__.parse(bname) - except ValueError: - continue - - orbit = result.get('orbit', 0) - logger.debug("Orbit number = %s", orbit) - orbit_numbers.append(orbit) - - # Test if there are more orbit numbers and at least log an info. - # FIXME! - return orbit_numbers[0] - def _get_output_messages(self, sdr_files, input_msg, orbit_number): """Generate output messages from SDR files and input message, and return.""" out_messages = [] @@ -188,6 +169,29 @@ def close(self): logger.exception("Couldn't stop publisher.") +def _fix_orbit_number(sdr_files, sdr_file_patterns): + """Get the orbit number from the SDR files produced with CSPP.""" + s_pattern = get_tb_files_pattern(sdr_file_patterns) + + p__ = Parser(s_pattern) + orbit_numbers = [] + for filename in sdr_files: + bname = os.path.basename(str(filename)) + logger.debug("SDR filename: %s", str(bname)) + try: + result = p__.parse(bname) + except ValueError: + continue + + orbit = result.get('orbit', 0) + logger.debug("Orbit number = %s", orbit) + orbit_numbers.append(orbit) + + # Test if there are more orbit numbers and at least log an info. + # FIXME! + return orbit_numbers[0] + + def prepare_posttroll_message(input_msg): """Create the basic posttroll-message fields and return.""" to_send = input_msg.data.copy() @@ -327,7 +331,7 @@ def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): sdr_proc.poll() - return cspp_workdir + return working_dir def get_filelist_from_collection(atms_collection): diff --git a/cspp_runner/tests/test_run_atms.py b/cspp_runner/tests/test_run_atms.py index ddf1ddb..d304c88 100644 --- a/cspp_runner/tests/test_run_atms.py +++ b/cspp_runner/tests/test_run_atms.py @@ -35,6 +35,7 @@ from cspp_runner.atms_rdr2sdr_runner import run_atms_from_message from cspp_runner.atms_rdr2sdr_runner import get_filelist_from_collection from cspp_runner.atms_rdr2sdr_runner import move_files_to_destination +from cspp_runner.atms_rdr2sdr_runner import _fix_orbit_number ATMS_FILENAMES = ['RATMS-RNSCA_j01_d20230208_t1154172_e1154492_b00001_c20230208115457829000_drlu_ops.h5', @@ -74,8 +75,9 @@ def test_run_atms_from_message(caplog, monkeypatch, fake_cspp_workdir, sdr_call = 'atms_sdr.sh' sdr_options = ['-d', '-a'] with caplog.at_level(logging.INFO): - run_atms_from_message(fake_atms_posttroll_message, sdr_call, sdr_options) + dirpath = run_atms_from_message(fake_atms_posttroll_message, sdr_call, sdr_options) + assert os.path.dirname(dirpath) == str(fake_cspp_workdir) res = caplog.text.strip().split('\n') assert len(res) == 4 @@ -154,3 +156,14 @@ def test_move_files_to_destination_pathlib(fake_yamlconfig_file, fake_sdr_homedi bnames.sort() assert bnames == expected + + +def test_fix_orbit_number(fake_yamlconfig_file, fake_atms_sdr_files_one_pass): + """Test fixing the orbit number from the SDR filenames.""" + config = read_config(fake_yamlconfig_file) + patterns = config['sdr_file_patterns'] + + sdr_files = glob(str(fake_atms_sdr_files_one_pass / '*h5')) + + result = _fix_orbit_number(sdr_files, patterns) + assert result == 27088 From 95060fa07a5498ee7c41d9e6c5d5799a183d4518 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Fri, 10 Feb 2023 17:47:06 +0100 Subject: [PATCH 11/17] Temporarily fix path without schema and host specification Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index 5886acd..643f717 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -138,7 +138,8 @@ def _get_output_messages(self, sdr_files, input_msg, orbit_number): dataset = [] for filepath in sdr_files: sdrfile = {} - sdrfile['uri'] = 'ssh://{host}/{path}'.format(host=self.host, path=filepath) + # sdrfile['uri'] = 'ssh://{host}/{path}'.format(host=self.host, path=filepath) + sdrfile['uri'] = '{path}'.format(path=filepath) sdrfile['uid'] = os.path.basename(filepath) dataset.append(sdrfile) From dc26f06a4e0b9bd8dc492aae4bedf173103eff4d Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Thu, 6 Apr 2023 13:20:43 +0200 Subject: [PATCH 12/17] Improve logging doing less info and more debug Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index 643f717..fa4107c 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -83,7 +83,8 @@ def __init__(self, configfile): def _setup_and_start_communication(self): """Set up the Posttroll communication and start the publisher.""" - logger.debug("Starting up... Input topics:") + logger.info("Starting up ATMS DR Runner") + logger.debug("Input topics:") for top in self.input_topics: logger.debug("{topic}".format(topic=str(top))) @@ -309,7 +310,7 @@ def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): t0_clock = time.process_time() t0_wall = time.time() - logger.info("Popen call arguments: " + str(cmdlist)) + logger.debug("Popen call arguments: " + str(cmdlist)) sdr_proc = subprocess.Popen(cmdlist, cwd=working_dir, env=my_env, @@ -319,13 +320,13 @@ def run_atms_from_message(posttroll_msg, sdr_call, sdr_options): line = sdr_proc.stdout.readline() if not line: break - logger.info(line.decode("utf-8").strip('\n')) + logger.debug(line.decode("utf-8").strip('\n')) while True: errline = sdr_proc.stderr.readline() if not errline: break - logger.info(errline.decode("utf-8").strip('\n')) + logger.debug(errline.decode("utf-8").strip('\n')) logger.info("Seconds process time: " + (str(time.process_time() - t0_clock))) logger.info("Seconds wall clock time: " + (str(time.time() - t0_wall))) From 517932a77ffe6ec1e976de7d382000b2140e5299 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Tue, 2 May 2023 11:08:24 +0200 Subject: [PATCH 13/17] Remove unused (commented out) code Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index fa4107c..8bc2ba3 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -139,7 +139,6 @@ def _get_output_messages(self, sdr_files, input_msg, orbit_number): dataset = [] for filepath in sdr_files: sdrfile = {} - # sdrfile['uri'] = 'ssh://{host}/{path}'.format(host=self.host, path=filepath) sdrfile['uri'] = '{path}'.format(path=filepath) sdrfile['uid'] = os.path.basename(filepath) dataset.append(sdrfile) From a29bd8433d9cbb87430d8c5e505f918caa23d76c Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Thu, 4 May 2023 14:22:50 +0200 Subject: [PATCH 14/17] Add template for atms cspp configuration Signed-off-by: Adam.Dybbroe --- examples/atms_dr_config.cfg_template | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 examples/atms_dr_config.cfg_template diff --git a/examples/atms_dr_config.cfg_template b/examples/atms_dr_config.cfg_template new file mode 100644 index 0000000..8c86d9c --- /dev/null +++ b/examples/atms_dr_config.cfg_template @@ -0,0 +1,19 @@ +[DEFAULT] +# Location to store Sensor Data Record (SDR) files after CSPP SDR processing +# is completed. +level1_home = /san1/polar_in/direct_readout/npp/atms/sdr/ + +working_dir = /san1/cspp/work + +# CSPP-atms batch script and parameters: +atms_sdr_call = atms_sdr.sh -a -d + +# Options to pass to the viirs_sdr_call +# see viirs_sdr.sh --help for explanation +atms_sdr_options = ['-a', '-d'] + +# Topic to use for publishing posttroll messages +publish_topic = /file/atms/sdr + +# Posttroll topics to listen to (comma separated) +subscribe_topics = /file/atms/rdr From 470e316990293f890dd037c2e4da575a0c1aeb4f Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Mon, 9 Oct 2023 10:31:49 +0200 Subject: [PATCH 15/17] Fix directory name from one of the three SDR files Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index 8bc2ba3..daa085b 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -231,6 +231,7 @@ def create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home): orbit = 0 platform = 'unknown' + result = {} for filename in sdr_filepaths: bname = os.path.basename(str(filename)) logger.debug("SDR filename: %s", str(bname)) @@ -242,8 +243,9 @@ def create_subdir_from_filepaths(sdr_filepaths, sdr_file_patterns, sdr_home): stime = result.get('start_time') if stime and stime < start_time: start_time = stime - orbit = result.get('orbit', 0) - platform = PLATFORM_LONGNAMES.get(result['platform_shortname']) + + orbit = result.get('orbit', 0) + platform = PLATFORM_LONGNAMES.get(result.get('platform_shortname', 'unknown')) subdirname = "{platform}_{dtime:%Y%m%d_%H%M}_{orbit:05d}".format(platform=platform.lower().replace('-', ''), dtime=start_time, orbit=orbit) From 6205e1ad7a1f789049bc6e82cc6b45deb186e3d7 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Mon, 9 Oct 2023 10:37:24 +0200 Subject: [PATCH 16/17] Fix test Signed-off-by: Adam.Dybbroe --- cspp_runner/tests/test_run_atms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cspp_runner/tests/test_run_atms.py b/cspp_runner/tests/test_run_atms.py index d304c88..19660c8 100644 --- a/cspp_runner/tests/test_run_atms.py +++ b/cspp_runner/tests/test_run_atms.py @@ -74,7 +74,7 @@ def test_run_atms_from_message(caplog, monkeypatch, fake_cspp_workdir, sdr_call = 'atms_sdr.sh' sdr_options = ['-d', '-a'] - with caplog.at_level(logging.INFO): + with caplog.at_level(logging.DEBUG): dirpath = run_atms_from_message(fake_atms_posttroll_message, sdr_call, sdr_options) assert os.path.dirname(dirpath) == str(fake_cspp_workdir) From e9e399d1a17c364f2e5cfbce6854838d8207b3e2 Mon Sep 17 00:00:00 2001 From: "Adam.Dybbroe" Date: Wed, 13 Mar 2024 11:24:55 +0100 Subject: [PATCH 17/17] Add some more debug printouts Signed-off-by: Adam.Dybbroe --- cspp_runner/atms_rdr2sdr_runner.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cspp_runner/atms_rdr2sdr_runner.py b/cspp_runner/atms_rdr2sdr_runner.py index daa085b..516cb96 100644 --- a/cspp_runner/atms_rdr2sdr_runner.py +++ b/cspp_runner/atms_rdr2sdr_runner.py @@ -118,6 +118,10 @@ def run(self): sdr_filepaths = get_filepaths(wrkdir, msg.data, self.sdr_file_patterns) logger.debug("Files: %s", str(sdr_filepaths)) + if len(sdr_filepaths) == 0: + logger.warning("No ATMS files - cspp processing failed! " + + "No files to move. Continue.") + continue dest_sdr_files = move_files_to_destination(sdr_filepaths, self.sdr_file_patterns, self._sdr_home) @@ -272,6 +276,7 @@ def get_filepaths(directory, msg_data, file_patterns): glbstr = globify(pattern, mda) logger.debug("Glob-string = %s", str(glbstr)) + logger.debug("Directory = %s", str(directory)) flist = glob(os.path.join(directory, glbstr)) files = files + flist