-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_multi_plot_spectrogram.py
More file actions
1356 lines (1238 loc) · 49.9 KB
/
batch_multi_plot_spectrogram.py
File metadata and controls
1356 lines (1238 loc) · 49.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Provides batch spectrogram plotting utilities.
Should work with CDFs like those from FAST (see batch_multi_plot_FAST_spectrograms.py) but should also be flexible with other data.
Assumed folder layout is::
{CDF_DATA_DIRECTORY}/year/month
Filenames in the month folders assumed to be in the following formats::
{??}_{??}_{??}_{instrument}_{timestamp}_{orbit}_v02.cdf (known "instruments" are ees, eeb, ies, or ieb)
{??}_{??}_orb_{orbit}_{??}.cdf
Examples::
FAST_data/2000/01/fa_esa_l2_eeb_20000101001737_13312_v02.cdf
FAST_data/2000/01/fa_k0_orb_13312_v01.cdf
"""
__authors__: list[str] = ["Ev Hansen"]
__contact__: str = "ephansen+gh@terpmail.umd.edu"
__credits__: list[list[str]] = [
["Ev Hansen", "Python code"],
["Emma Mirizio", "Co-Mentor"],
["Marilia Samara", "Co-Mentor"],
]
__date__: str = "2025-08-13"
__status__: str = "Development"
__version__: str = "0.0.1"
__license__: str = "GPL-3.0"
# Main imports for CDF data and plotting
import pandas as pd
import cdflib
import numpy as np
import matplotlib
matplotlib.use("Agg") # Use non-interactive backend for batch
from matplotlib.figure import Figure
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.dates import date2num
import matplotlib.dates as mdates
import matplotlib.colors as mcolors
from matplotlib import _pylab_helpers
from datetime import datetime, timezone
import os
from pathlib import Path
from tqdm import tqdm
from collections import defaultdict, deque
import json
import time as _time
import concurrent.futures
# garbage collection, parallel processing, and profiling
import gc
import concurrent.futures
import signal
import sys
# Section: Constants and Configuration
# Directory containing CDF data files
CDF_DATA_DIRECTORY = "./FAST_data/"
# List of variable names expected in CDF files
CDF_VARIABLE_NAMES = ["time_unix", "data", "energy", "pitch_angle"]
# Function to collapse 3D data arrays to 2D (e.g., sum over axis)
COLLAPSE_FUNCTION = np.nansum
# Colormaps for different axis scaling combinations (colorblind-friendly and visually distinct)
COLORMAP_LINEAR_Y_LINEAR_Z = "viridis"
COLORMAP_LINEAR_Y_LOG_Z = "cividis"
COLORMAP_LOG_Y_LINEAR_Z = "plasma"
COLORMAP_LOG_Y_LOG_Z = "inferno"
# Plot configuration
PLOT_FIGURE_WIDTH_INCHES = 6.25
PLOT_FIGURE_HEIGHT_INCHES = 2.0
TICK_LABEL_FONT_SIZE = 15
AXIS_LABEL_FONT_SIZE = 18
DEFAULT_ZOOM_WINDOW_MINUTES = 6 # Default zoom window duration in minutes
FILTERED_ORBITS_CSV_PATH = "./FAST_Cusp_Indices.csv" # Path to filtered cusp orbits CSV
PLOTTING_PROGRESS_JSON_PATH = "./batch_multi_plot_progress.json" # Path to JSON for tracking plotting progress across sessions
OUTPUT_BASE_DIRECTORY = "./plots/" # Parent directory to save plots
# Logfile configuration for batch restarts
LOGFILE_DATETIME_PATH = "./batch_multi_plot_logfile_datetime.txt"
if os.path.exists(LOGFILE_DATETIME_PATH):
with open(LOGFILE_DATETIME_PATH, "r") as f:
LOGFILE_DATETIME_STRING = f.read().strip()
if not LOGFILE_DATETIME_STRING:
LOGFILE_DATETIME_STRING = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
with open(LOGFILE_DATETIME_PATH, "w") as f:
f.write(LOGFILE_DATETIME_STRING)
else:
LOGFILE_DATETIME_STRING = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
with open(LOGFILE_DATETIME_PATH, "w") as f:
f.write(LOGFILE_DATETIME_STRING)
LOGFILE_PATH = f"./batch_multi_plot_log_{LOGFILE_DATETIME_STRING}.log"
# Pitch angle category definitions for spectrogram plots
PITCH_ANGLE_CATEGORY_RANGES = {
"downgoing": [(0, 30), (330, 360)],
"upgoing": [(150, 210)],
"perpendicular": [(40, 140), (210, 330)],
"all": [(0, 360)],
}
# Global caches for batch optimization
filtered_orbits_cache = {}
orbit_column_cache = {}
cdf_type_cache = {}
# Section: Functions
# Section: Utility Functions
def load_filtered_orbits(csv_path=FILTERED_ORBITS_CSV_PATH):
"""Load the filtered orbits CSV with a simple cache.
Parameters
----------
csv_path : str, default FILTERED_ORBITS_CSV_PATH
Path to the filtered orbits TSV/CSV file.
Returns
-------
pandas.DataFrame or None
DataFrame of filtered orbits, or ``None`` if loading fails.
Notes
-----
A module-level dictionary caches previously loaded DataFrames keyed by absolute
path string to avoid repeated disk I/O in batch routines.
"""
global filtered_orbits_cache
if csv_path in filtered_orbits_cache:
return filtered_orbits_cache[csv_path]
try:
dataframe = pd.read_csv(csv_path, sep="\t")
filtered_orbits_cache[csv_path] = dataframe
return dataframe
except Exception as exc:
log_error(f"Error loading CSV {csv_path}: {exc}")
return None
# Section: SIGINT Handling
def _terminate_all_child_processes():
"""Attempt to terminate all child processes of this process.
Returns
-------
None
Notes
-----
Uses :mod:`psutil` (imported lazily) to enumerate child processes recursively
and invoke ``terminate()`` on each. Exceptions during termination are suppressed
because this function is used during best-effort shutdown handling.
"""
import psutil
current_process = psutil.Process()
for child in current_process.children(recursive=True):
try:
child.terminate()
except Exception as child_termination_exception:
# Suppress individual child termination issues during shutdown.
# Variable name intentionally descriptive for lint clarity.
_ = child_termination_exception # explicit no-op reference
def _sigint_handler(signum, frame):
"""SIGINT handler to terminate children and exit promptly.
Parameters
----------
signum : int
Signal number.
frame : FrameType or None
Current execution frame (unused).
Returns
-------
None
"""
log_message("[INFO] SIGINT received. Terminating all child processes and exiting.")
_terminate_all_child_processes()
sys.exit(1)
# Section: Logging
_LOG_BUFFER = [] # list[tuple[str, str]]: buffered (level, message) entries
_LOG_BATCH_SIZE = 10 # default batch size for buffered logging; configurable
def configure_log_batch(batch_size: int):
"""Configure buffered logging batch size.
Parameters
----------
batch_size : int
Desired number of log records to accumulate before an automatic flush.
Values less than 1 are coerced to 1.
"""
global _LOG_BATCH_SIZE
_LOG_BATCH_SIZE = max(1, int(batch_size))
def _flush_log_buffer(force: bool = False):
"""Flush buffered log messages to disk.
Parameters
----------
force : bool, default False
If True, flush even if the current buffer length is below the configured
batch size threshold.
"""
if not _LOG_BUFFER:
return
if (len(_LOG_BUFFER) >= _LOG_BATCH_SIZE) or force:
try:
with open(LOGFILE_PATH, "a") as logfile_out:
for level, msg in _LOG_BUFFER:
if level == "error":
logfile_out.write(f"[ERROR] {msg}\n")
else:
logfile_out.write(msg + "\n")
except Exception as log_flush_exception:
# Last-resort console output
tqdm.write(f"[ERROR] Failed flushing log buffer: {log_flush_exception}")
finally:
_LOG_BUFFER.clear()
def log_message(message: str, force_flush: bool = False):
"""Queue an informational log message.
Messages are appended to an in-memory buffer; a flush occurs automatically
once the configured batch size is reached or ``force_flush`` is True.
"""
_LOG_BUFFER.append(("info", message))
_flush_log_buffer(force=force_flush)
def log_error(message: str, force_flush: bool = False):
"""Queue an error log message and echo to console immediately."""
tqdm.write("[ERROR] " + message)
_LOG_BUFFER.append(("error", message))
_flush_log_buffer(force=force_flush)
def get_timestamps_for_orbit(
filtered_orbits_dataframe, orbit_number, instrument_type, time_unix_array
):
"""Compute orbit boundary UNIX timestamps from filtered indices.
Parameters
----------
filtered_orbits_dataframe : pandas.DataFrame
DataFrame containing filtered orbits and min/max indices per instrument.
orbit_number : int
Orbit number to look up.
instrument_type : str
Instrument type identifier (e.g., ``'ees'``, ``'ies'``).
time_unix_array : numpy.ndarray
1D array of UNIX timestamps for the instrument.
Returns
-------
list of float
Boundary UNIX timestamps for the orbit (one value for a degenerate span
or two values for start/end). Returns an empty list on invalid indices.
"""
global orbit_column_cache
dataframe = filtered_orbits_dataframe
cache = orbit_column_cache
if dataframe is None or instrument_type is None or time_unix_array is None:
return []
key = (id(dataframe), instrument_type)
if key not in cache:
orbit_column = next(col for col in dataframe.columns if "orbit" in col.lower())
min_index_column = next(
col
for col in dataframe.columns
if instrument_type in col.lower() and "min index" in col.lower()
)
max_index_column = next(
col
for col in dataframe.columns
if instrument_type in col.lower() and "max index" in col.lower()
)
cache[key] = (orbit_column, min_index_column, max_index_column)
else:
orbit_column, min_index_column, max_index_column = cache[key]
row = dataframe[dataframe[orbit_column] == orbit_number]
if row.empty:
return []
min_index = row.iloc[0][cache[key][1]]
max_index = row.iloc[0][cache[key][2]]
try:
min_index = int(min_index)
max_index = int(max_index)
except Exception as orbit_index_cast_exception:
log_message("[WARN] Non-integer indices found in orbit row, using 0.")
_ = orbit_index_cast_exception # explicit no-op reference
return []
min_index = max(0, min(min_index, len(time_unix_array) - 1))
max_index = max(0, min(max_index, len(time_unix_array) - 1))
if min_index == max_index:
return [float(time_unix_array[min_index])]
return [float(time_unix_array[min_index]), float(time_unix_array[max_index])]
def get_cdf_file_type(cdf_file_path: str):
"""Infer instrument type from a CDF file path.
Parameters
----------
cdf_file_path : str
Path to the CDF file.
Returns
-------
str or None
Instrument type string (e.g., ``'ees'``), ``'orb'`` for orbit files, or ``None`` if not recognized.
"""
path_lower = cdf_file_path.lower()
instrument_tags = ["ees", "eeb", "ies", "ieb"]
if "_orb_" in path_lower:
return "orb"
for tag in instrument_tags:
if f"_{tag}_" in path_lower:
return tag
log_error(f"Unknown CDF file type for path: {cdf_file_path}")
return None
def get_variable_shape(cdf_path, variable_name):
"""Return the shape of a variable in a CDF file.
Parameters
----------
cdf_path : str
Path to the CDF file.
variable_name : str
Variable name to inspect.
Returns
-------
tuple or None
Variable shape tuple, or ``None`` if variable absent / not array or an error occurs.
"""
global cdf_type_cache
instrument_type = cdf_type_cache.get(cdf_path)
if instrument_type is None:
instrument_type = get_cdf_file_type(cdf_path)
cdf_type_cache[cdf_path] = instrument_type
if instrument_type is None or instrument_type == "orb":
return None
try:
with cdflib.CDF(cdf_path) as cdf:
variable_data = cdf.varget(variable_name)
return (
variable_data.shape if isinstance(variable_data, np.ndarray) else None
)
except Exception as exc:
log_error(f"Error reading {cdf_path} for variable {variable_name}: {exc}")
return None
def get_cdf_var_shapes(
cdf_folder_path=CDF_DATA_DIRECTORY, variable_names=CDF_VARIABLE_NAMES
):
"""Collect shapes of variables across CDF files in a folder.
Parameters
----------
cdf_folder_path : str, default CDF_DATA_DIRECTORY
Directory containing CDF files.
variable_names : list of str, default CDF_VARIABLE_NAMES
Variable names to inspect.
Returns
-------
dict
Mapping from variable name (str) to list of shape tuples (or None) per file.
"""
cdf_file_paths = [str(p) for p in Path(cdf_folder_path).rglob("*.[cC][dD][fF]")]
shapes_by_variable = {}
for variable_name in variable_names:
shapes_by_variable[variable_name] = []
for cdf_path in tqdm(
cdf_file_paths,
desc=f"Processing CDF files ({variable_name})",
unit="file",
total=len(cdf_file_paths),
):
shapes_by_variable[variable_name].append(
get_variable_shape(cdf_path, variable_name)
)
return shapes_by_variable
def close_all_axes_and_clear(fig):
"""Close axes/subplots and clear a figure to free memory.
Parameters
----------
fig : matplotlib.figure.Figure
Figure instance to clear and dispose.
Returns
-------
None
Notes
-----
Ensures axes are deleted, the canvas is closed/detached, and removes the figure
from the global Gcf registry when possible to mitigate memory growth during
large batch operations.
"""
for axis in list(fig.axes):
try:
fig.delaxes(axis)
except Exception as axis_close_error:
log_error(f"Error closing axis: {axis_close_error}")
fig.clf()
if hasattr(fig, "canvas") and fig.canvas is not None:
try:
fig.canvas.close()
except Exception as canvas_close_error:
log_message(f"[WARN] Error closing canvas: {canvas_close_error}")
try:
fig.canvas.figure = None
except Exception as canvas_figure_clear_error:
log_message(
f"[WARN] Error clearing canvas figure: {canvas_figure_clear_error}"
)
fig.canvas = None
try:
if hasattr(fig, "number") and fig.number is not None:
_pylab_helpers.Gcf.destroy(fig.number)
except Exception as gcf_registry_error:
log_error(f"Error removing figure from Gcf registry: {gcf_registry_error}")
# Section: Spectrogram Plotting
def make_spectrogram(
x_axis_values,
y_axis_values,
data_array_3d,
x_axis_min=None,
x_axis_max=None,
x_axis_is_unix=True,
x_axis_label=None,
center_timestamp=None,
window_duration_seconds=None,
y_axis_scale_function=None,
y_axis_label=None,
y_axis_min=0,
y_axis_max=4000,
z_axis_scale_function=None,
z_axis_min=None,
z_axis_max=None,
z_axis_label=None,
collapse_axis=1,
colormap="viridis",
axis_object=None,
instrument_label=None,
vertical_lines_unix=None, # list of unix timestamps to mark
):
"""Plot a spectrogram by collapsing a 3D data array along an axis.
Parameters
----------
x_axis_values : array-like
1D array for x (horizontal) axis (e.g., time sequence).
y_axis_values : array-like
1D array for y (vertical) axis (e.g., energy bins).
data_array_3d : numpy.ndarray
3D data array, e.g. ``(time, angle/pitch, energy)``.
x_axis_min, x_axis_max : float, optional
Explicit x-axis clipping bounds before plotting.
x_axis_is_unix : bool, default True
If ``True``, x-axis treated as UNIX seconds and converted to dates.
x_axis_label : str, optional
Custom x-axis label (default depends on ``x_axis_is_unix``).
center_timestamp : float, optional
Center of requested zoom window (UNIX seconds).
window_duration_seconds : float, optional
Duration of zoom window; both must be provided for zoom to apply.
y_axis_scale_function : {'linear', 'log'}, optional
Y-axis scaling; ``None`` behaves as ``'linear'``.
y_axis_label : str, optional
Y-axis label text.
y_axis_min, y_axis_max : float, default 0, 4000
Y-axis clipping range applied before filtering / plotting.
z_axis_scale_function : {'linear', 'log'}, optional
Color scale mode; ``None`` behaves as ``'linear'``.
z_axis_min, z_axis_max : float, optional
Optional color scale bounds (percentiles chosen if omitted).
z_axis_label : str, optional
Colorbar label text.
collapse_axis : int, default 1
Axis index along which to collapse the 3D data array.
colormap : str, default 'viridis'
Matplotlib colormap name.
axis_object : matplotlib.axes.Axes, optional
Existing axes to draw into; if ``None`` a new figure/axes created.
instrument_label : str, optional
Title string applied to the axes.
vertical_lines_unix : list of float, optional
UNIX timestamps to annotate with vertical lines.
Returns
-------
axis_object : matplotlib.axes.Axes or None
The axis object used for plotting (``None`` if no data plotted).
x_axis_plot : numpy.ndarray or None
X values actually used (possibly filtered / converted), or ``None`` if skipped.
"""
# Log the function call and key parameters for debugging
log_message(
f"[DEBUG] make_spectrogram: y_axis_scale_function={y_axis_scale_function}, z_axis_scale_function={z_axis_scale_function}, z_axis_min={z_axis_min}, z_axis_max={z_axis_max}, colormap={colormap}"
)
# Convert input arrays to numpy arrays for consistency
x_axis = np.asarray(x_axis_values)
y_axis = np.asarray(y_axis_values)
data_array = np.asarray(data_array_3d)
# Collapse the 3D data array along the specified axis (e.g., sum over pitch angle)
collapsed_matrix = COLLAPSE_FUNCTION(data_array, axis=collapse_axis)
# Mask out columns that are all NaN and restrict to valid energy range
nan_column_mask = ~np.all(np.isnan(collapsed_matrix), axis=0)
valid_energy_mask = (y_axis >= y_axis_min) & (y_axis <= y_axis_max)
combined_mask = nan_column_mask & valid_energy_mask
collapsed_matrix = collapsed_matrix[:, combined_mask]
y_axis = y_axis[combined_mask]
if collapsed_matrix.size == 0 or y_axis.size == 0:
log_message("[WARNING] All energy bins were filtered out. No data to plot.")
return None, None
# Ensure y-axis is increasing (for plotting)
if y_axis[0] > y_axis[-1]:
y_axis = y_axis[::-1]
collapsed_matrix = collapsed_matrix[:, ::-1]
# If a zoom window is specified, restrict to that window
if center_timestamp is not None and window_duration_seconds is not None:
half_window = window_duration_seconds / 2
left_bound = center_timestamp - half_window
right_bound = center_timestamp + half_window
zoom_mask = (x_axis >= left_bound) & (x_axis <= right_bound)
x_axis = x_axis[zoom_mask]
collapsed_matrix = collapsed_matrix[zoom_mask, :]
# Restrict to specified x-axis min/max if provided
if x_axis_min is not None or x_axis_max is not None:
x_mask = np.ones_like(x_axis, dtype=bool)
if x_axis_min is not None:
x_mask &= x_axis >= x_axis_min
if x_axis_max is not None:
x_mask &= x_axis <= x_axis_max
x_axis = x_axis[x_mask]
collapsed_matrix = collapsed_matrix[x_mask, :]
# Convert x-axis to matplotlib date format if using unix timestamps
if x_axis_is_unix:
x_axis_datetime = np.array(
[datetime.fromtimestamp(x, tz=timezone.utc) for x in x_axis]
)
x_axis_plot = date2num(x_axis_datetime)
x_label = x_axis_label if x_axis_label is not None else "Time (UTC)"
else:
x_axis_plot = x_axis
x_label = x_axis_label if x_axis_label is not None else "X"
# Create a new figure and axis if not provided
if axis_object is None:
fig = Figure(figsize=(PLOT_FIGURE_WIDTH_INCHES, PLOT_FIGURE_HEIGHT_INCHES))
canvas = FigureCanvas(fig)
axis_object = fig.add_subplot(1, 1, 1)
else:
fig = axis_object.figure
# Transpose matrix for plotting (so y-axis is vertical)
matrix_plot = collapsed_matrix.T
# Set x-axis limits to zoom window if specified, otherwise to full range
if center_timestamp is not None and window_duration_seconds is not None:
if x_axis_is_unix:
left_num = float(
date2num(
datetime.fromtimestamp(
center_timestamp - window_duration_seconds / 2, tz=timezone.utc
)
)
)
right_num = float(
date2num(
datetime.fromtimestamp(
center_timestamp + window_duration_seconds / 2, tz=timezone.utc
)
)
)
axis_object.set_xlim(left_num, right_num)
else:
axis_object.set_xlim(
center_timestamp - window_duration_seconds / 2,
center_timestamp + window_duration_seconds / 2,
)
else:
axis_object.set_xlim(x_axis_plot[0], x_axis_plot[-1])
# If no data remains after filtering, skip plotting
if matrix_plot.size == 0:
log_message("[WARNING] No data to plot after filtering. Skipping plot.")
return None, None
# Set colorbar min/max if not provided
if z_axis_min is None:
z_axis_min = np.nanpercentile(matrix_plot, 1)
if z_axis_max is None:
z_axis_max = np.nanpercentile(matrix_plot, 99)
# Find the smallest positive value for safe log scaling
finite_positive = matrix_plot[np.isfinite(matrix_plot) & (matrix_plot > 0)]
safe_vmin = np.nanmin(finite_positive) if finite_positive.size > 0 else 1e-10
# Plot with log colorbar if requested, masking non-positive values
if z_axis_scale_function == "log":
if np.any(matrix_plot <= 0) or not (
np.isfinite(z_axis_min)
and np.isfinite(z_axis_max)
and z_axis_min > 0
and z_axis_max > 0
and z_axis_max > z_axis_min
):
log_message(
"[WARNING] Non-positive values found in matrix for log colorbar. Masking to z_axis_min and enforcing log scale."
)
z_axis_min = float(max(z_axis_min, safe_vmin, 1e-10))
z_axis_max = float(z_axis_max)
# Mask all non-positive and non-finite values for log scale
matrix_plot = np.where(
~np.isfinite(matrix_plot) | (matrix_plot <= 0), z_axis_min, matrix_plot
)
norm = mcolors.LogNorm(vmin=z_axis_min, vmax=z_axis_max)
im = axis_object.imshow(
matrix_plot,
aspect="auto",
origin="lower",
extent=(x_axis_plot[0], x_axis_plot[-1], y_axis[0], y_axis[-1]),
cmap=colormap,
norm=norm,
)
# Compute tick marks for every integer power of 10 in range
min_exponent = int(np.floor(np.log10(z_axis_min)))
max_exponent = int(np.ceil(np.log10(z_axis_max)))
ticks = [
10**i
for i in range(min_exponent, max_exponent + 1)
if z_axis_min <= 10**i <= z_axis_max
]
log_message(f"[DEBUG] make_spectrogram: log colorbar ticks: {ticks}")
def log_tick_formatter(value, position=None):
if value <= 0:
return ""
exponent = int(np.log10(value))
if np.isclose(value, 10**exponent):
return f"$10^{{{exponent}}}$"
return ""
# Create the colorbar with custom ticks and formatter
colorbar = fig.colorbar(
im,
ax=axis_object,
label=z_axis_label if z_axis_label is not None else "Counts",
ticks=ticks,
format=log_tick_formatter,
)
else:
# Linear colorbar: mask NaN and inf values, set vmin/vmax
z_axis_min = float(z_axis_min)
z_axis_max = float(z_axis_max)
matrix_plot = np.where(np.isnan(matrix_plot), z_axis_min, matrix_plot)
matrix_plot = np.where(np.isneginf(matrix_plot), z_axis_min, matrix_plot)
matrix_plot = np.where(np.isposinf(matrix_plot), z_axis_max, matrix_plot)
if not (
np.isfinite(z_axis_min)
and np.isfinite(z_axis_max)
and z_axis_max > z_axis_min
):
z_axis_min = float(np.nanmin(matrix_plot))
z_axis_max = float(np.nanmax(matrix_plot))
im = axis_object.imshow(
matrix_plot,
aspect="auto",
origin="lower",
extent=(x_axis_plot[0], x_axis_plot[-1], y_axis[0], y_axis[-1]),
cmap=colormap,
vmin=z_axis_min,
vmax=z_axis_max,
)
# Create the colorbar for linear scale
colorbar = fig.colorbar(
im,
ax=axis_object,
label=z_axis_label if z_axis_label is not None else "Counts",
)
# Set axis labels and title
axis_object.set_xlabel(x_label)
axis_object.set_ylabel(y_axis_label if y_axis_label is not None else "Energy (eV)")
if instrument_label is not None:
axis_object.set_title(instrument_label)
# Configure y-axis ticks and scale
if len(y_axis) >= 2:
if y_axis_scale_function != "log":
# For linear y-axis, set ticks at reasonable intervals
y_max_str = str(y_axis_max)
y_max_digits = len(y_max_str)
y_first_digit = int(y_max_str[0])
y_second_digit = int(y_max_str[1])
if y_second_digit >= 5:
step_size = 10**y_max_digits
y_max_tick = (y_first_digit) * 10 ** (y_max_digits - 1)
else:
step_size = 10 ** (y_max_digits - 1)
y_max_tick = (y_first_digit + 0.5) * 10 ** (y_max_digits - 1)
yticks = [
i
for i in range(y_axis_min, int(y_max_tick) + 1, step_size)
if (i / y_max_tick) <= 1.1
]
if len(yticks) > 0:
axis_object.set_yticks(yticks)
axis_object.set_yticklabels([f"{int(e)}" for e in yticks])
else:
# For log y-axis, set scale to log
axis_object.set_yscale("log")
# Format x-axis as time if using unix timestamps
if x_axis_is_unix:
x_limits = axis_object.get_xlim()
left_datetime = mdates.num2date(x_limits[0], tz=timezone.utc)
right_datetime = mdates.num2date(x_limits[1], tz=timezone.utc)
displayed_time_range_seconds = (right_datetime - left_datetime).total_seconds()
if displayed_time_range_seconds < 120:
axis_object.xaxis.set_major_formatter(
mdates.DateFormatter("%H:%M:%S", tz=timezone.utc)
)
else:
axis_object.xaxis.set_major_formatter(
mdates.DateFormatter("%H:%M", tz=timezone.utc)
)
# Draw vertical lines for orbit boundaries or other events if provided
if vertical_lines_unix is not None and len(vertical_lines_unix) > 0:
if x_axis_is_unix:
vertical_lines_plot = date2num(
[
datetime.fromtimestamp(timestamp, tz=timezone.utc)
for timestamp in vertical_lines_unix
]
)
x_min_plot = x_axis_plot[0]
x_max_plot = x_axis_plot[-1]
vertical_lines_plot = [
v for v in vertical_lines_plot if x_min_plot <= v <= x_max_plot
]
else:
vertical_lines_plot = [
v for v in vertical_lines_unix if x_axis_plot[0] <= v <= x_axis_plot[-1]
]
for vertical_line in vertical_lines_plot:
# Draw a thick black line under a thinner red line for visibility
axis_object.axvline(
vertical_line,
color="black",
linestyle="-",
linewidth=4,
alpha=1.0,
zorder=10,
)
axis_object.axvline(
vertical_line,
color="red",
linestyle="-",
linewidth=2,
alpha=1.0,
zorder=11,
)
# Set tick parameters for better readability
axis_object.tick_params(
axis="both", which="major", labelsize=TICK_LABEL_FONT_SIZE, length=8, width=1
)
axis_object.tick_params(
axis="both", which="minor", labelsize=TICK_LABEL_FONT_SIZE, length=5, width=1
)
colorbar.ax.tick_params(labelsize=TICK_LABEL_FONT_SIZE, length=6, width=1)
colorbar.ax.tick_params(
which="minor", labelsize=TICK_LABEL_FONT_SIZE, length=3, width=1
)
# Set axis label font sizes
axis_object.xaxis.label.set_fontsize(AXIS_LABEL_FONT_SIZE)
axis_object.yaxis.label.set_fontsize(AXIS_LABEL_FONT_SIZE)
colorbar.ax.set_ylabel("Counts", fontsize=AXIS_LABEL_FONT_SIZE)
# Return the axis and the x-axis values used for plotting
return axis_object, x_axis_plot
def generic_plot_spectrogram_set(
datasets,
collapse_axis=1,
zoom_center=None,
zoom_window_seconds=None,
vertical_lines=None,
x_is_unix=True,
y_scale="linear",
z_scale="linear",
colormap="viridis",
figure_title=None,
show=False,
y_min=None,
y_max=None,
z_min=None,
z_max=None,
):
"""Plot a vertical stack of generic spectrograms.
Parameters
----------
datasets : list of dict
Each dict requires keys ``'x'``, ``'y'``, ``'data'`` and may include optional keys:
``'label'``, ``'y_label'``, ``'z_label'``, ``'y_min'``, ``'y_max'``, ``'z_min'``, ``'z_max'``.
collapse_axis : int, default 1
Axis index of the 3D array collapsed prior to plotting.
zoom_center : float, optional
Center (UNIX time) for zoom column when used.
zoom_window_seconds : float, optional
Duration of zoom window (seconds) when ``zoom_center`` provided.
vertical_lines : list of float, optional
UNIX timestamps to annotate with vertical lines.
x_is_unix : bool, default True
If ``True``, x values are treated as UNIX seconds and formatted.
y_scale : {'linear', 'log'}, default 'linear'
Y-axis scaling mode.
z_scale : {'linear', 'log'}, default 'linear'
Color (intensity) scale mode.
colormap : str, default 'viridis'
Matplotlib colormap name.
figure_title : str, optional
Figure-level title (sup-title).
show : bool, default False
If ``True``, display interactively (requires GUI backend).
y_min : float, optional
Global Y min fallback when per-row not supplied. Defaults to 0 if omitted and per-row missing.
y_max : float, optional
Global Y max fallback when per-row not supplied. If both global and per-row absent, inferred.
z_min : float, optional
Global colorbar lower bound fallback.
z_max : float, optional
Global colorbar upper bound fallback.
Returns
-------
tuple
``(fig, canvas)`` or ``(None, None)`` if ``datasets`` is empty.
"""
if not datasets:
return None, None
fig = Figure(figsize=(10, 3 * len(datasets)))
canvas = FigureCanvas(fig)
axes = []
for row_index, dataset in enumerate(datasets):
axis_obj = fig.add_subplot(len(datasets), 1, row_index + 1)
axes.append(axis_obj)
# Resolve per-dataset ranges with global fallback (row-specific wins)
dataset_y_min = dataset.get("y_min", y_min)
dataset_y_max = dataset.get("y_max", y_max)
dataset_z_min = dataset.get("z_min", z_min)
dataset_z_max = dataset.get("z_max", z_max)
# Compute fallback y max from provided y array if not given
inferred_y_max = (
dataset["y"].max()
if dataset_y_max is None and dataset.get("y") is not None
else dataset_y_max
)
make_spectrogram(
x_axis_values=dataset["x"],
y_axis_values=dataset["y"],
data_array_3d=dataset["data"],
collapse_axis=collapse_axis,
center_timestamp=zoom_center,
window_duration_seconds=zoom_window_seconds,
x_axis_is_unix=x_is_unix,
y_axis_scale_function=y_scale,
z_axis_scale_function=z_scale,
y_axis_min=dataset_y_min if dataset_y_min is not None else 0,
y_axis_max=inferred_y_max if inferred_y_max is not None else 4000,
z_axis_min=dataset_z_min,
z_axis_max=dataset_z_max,
colormap=colormap,
y_axis_label=dataset.get("y_label", "Energy (eV)"),
z_axis_label=dataset.get("z_label", "Counts"),
x_axis_label="Time (UTC)" if x_is_unix else dataset.get("x_label"),
vertical_lines_unix=vertical_lines,
axis_object=axis_obj,
)
if dataset.get("label"):
axis_obj.set_title(dataset["label"])
if figure_title:
fig.suptitle(figure_title)
fig.tight_layout(rect=(0, 0, 1, 0.97))
if show:
import matplotlib.pyplot as plt
plt.show()
return fig, canvas
def generic_batch_plot(
items,
output_dir,
build_datasets_fn,
zoom_center_fn=None,
zoom_window_seconds=None,
vertical_lines_fn=None,
y_scale="linear",
z_scale="linear",
colormap="viridis",
max_workers=2,
progress_json_path: str = PLOTTING_PROGRESS_JSON_PATH,
ignore_progress_json: bool = False,
flush_batch_size: int = 10,
log_flush_batch_size: int | None = None,
install_signal_handlers: bool = True,
):
"""Generic batch runner for plotting datasets.
Parameters
----------
items : iterable
Iterable of item identifiers (any ``repr``-able objects).
output_dir : str
Base output directory; plots saved under ``output_dir/<item>/generic.png``.
build_datasets_fn : callable
Callable returning ``list[dict]`` describing datasets for an item.
zoom_center_fn : callable, optional
Callable mapping item -> center UNIX time (or ``None``) for zoom.
zoom_window_seconds : float, optional
Duration of zoom window in seconds.
vertical_lines_fn : callable, optional
Callable mapping item -> list[float] UNIX timestamps (or ``None``).
y_scale : {'linear', 'log'}, default 'linear'
Y-axis scaling for all rows.
z_scale : {'linear', 'log'}, default 'linear'
Color scaling for all rows.
colormap : str, default 'viridis'
Matplotlib colormap name.
max_workers : int, default 2
Number of parallel worker processes.
progress_json_path : str, default PLOTTING_PROGRESS_JSON_PATH
Path to progress JSON (resumable state). Created/updated as needed.
ignore_progress_json : bool, default False
If ``True``, skip reading existing progress prior to execution.
flush_batch_size : int, default 10
Progress/log batch size; values < 1 coerced to 1. Final partial batch flushed.
log_flush_batch_size : int, optional
Explicit log batch size; if ``None`` reuse ``flush_batch_size``.
install_signal_handlers : bool, default True
When True, a temporary SIGINT handler is installed (restored on exit) to
enable graceful interruption (progress & log flush). Set False in embedded
environments if altering the global handler causes side-effects.
Returns
-------
list of tuple
Sequence of ``(item, status)`` with ``status`` in {``'ok'``, ``'no_data'``, ``'error'``}.
Notes
-----
* Logging is buffered and force-flushed at completion.
* Progress JSON contains simple lists of completed, error, and no-data items.
* Items are identified via ``repr(item)`` for data-agnostic persistence.
"""
os.makedirs(output_dir, exist_ok=True)
previous_sigint = None
if install_signal_handlers:
try:
previous_sigint = signal.getsignal(signal.SIGINT)