Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hlavo/ingress/moist_profile/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/workdir/*
1 change: 1 addition & 0 deletions hlavo/ingress/moist_profile/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .data_filter import read_data, plot_filters, FilterParams
259 changes: 259 additions & 0 deletions hlavo/ingress/moist_profile/data_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# author: David Flanderka



import sys
from pathlib import Path
import pandas as pd
import numpy as np
import logging
import zarr_fuse as zf
from dataclasses import dataclass
from dotenv import load_dotenv
import xarray as xr
import polars as pl
import matplotlib
matplotlib.use('Agg') # for interactive graphs

import matplotlib.pyplot as plt
from matplotlib.colors import TwoSlopeNorm
from matplotlib.backends.backend_pdf import PdfPages

matplotlib.rcParams['hatch.linewidth'] = 6

logger = logging.getLogger(__name__)


@dataclass
class FilterParams:
robust_sigma_coef: float
local_threshold_coef: float
global_threshold_coef: float


def _create_work_dir():
"""
Create workdir if doesn't exist, return path
"""
script_dir = Path(__file__).parent
workdir = script_dir / "workdir"
workdir.mkdir(exist_ok=True)
return workdir


def _open_zarr_schema():
script_dir = Path(__file__).parent
root_path = script_dir / "../../.."
file_path = root_path / ".secrets_env"
load_dotenv(dotenv_path=file_path)

schema_path = script_dir / "profile_schema.yaml"
return zf.open_store(schema_path)


def _filter_jumps(fparams: FilterParams, df: pd.DataFrame):
"""
Input arg of type pandas.DataFrame contains columns 'date_time' and 'moisture'.
For values in column 'moisture':
- detect any sudden jump parameterized by size of jump as multiple of “average diff” in neighbourhood
- or detect outliers in the time series of diffs
- create new variable with data shifted by accumulative jumps and return its
"""
required_columns = {"date_time", "moisture"}
missing_columns = required_columns - set(df.columns)
assert not missing_columns, f"Missing required columns: {sorted(missing_columns)}"

if len(df) < 3:
result = df.copy()
result["moisture_filtered"] = result["moisture"]
result["moisture_jump"] = False
result["moisture_correction"] = 0.0
return result

ordered = df.sort_values("date_time").copy()
diff = ordered["moisture"].diff()
abs_diff = diff.abs()

neighbourhood_diff = abs_diff.rolling(window=11, center=True, min_periods=3).mean()
neighbourhood_diff = neighbourhood_diff.fillna(abs_diff.mean())
neighbourhood_diff = neighbourhood_diff.fillna(0.0)

diff_center = diff.median(skipna=True)
diff_mad = (diff - diff_center).abs().median(skipna=True)
robust_sigma = fparams.robust_sigma_coef * diff_mad if pd.notna(diff_mad) else 0.0

local_threshold = fparams.local_threshold_coef * neighbourhood_diff
global_threshold = fparams.global_threshold_coef * robust_sigma
min_threshold = max(float(abs_diff.median(skipna=True) or 0.0), 1e-9)
jump_threshold = np.maximum(local_threshold.to_numpy(), max(global_threshold, min_threshold))

jump_mask = abs_diff.to_numpy() > jump_threshold
jump_mask[0] = False
jump_series = pd.Series(jump_mask, index=ordered.index)

jump_diff = diff.where(jump_series, 0.0).fillna(0.0)
cumulative_correction = jump_diff.cumsum()

ordered["moisture_jump"] = jump_series
ordered["moisture_correction"] = cumulative_correction
ordered["moisture_filtered"] = ordered["moisture"] - cumulative_correction

logger.debug(
"Detected %s moisture jumps for site=%s depth=%s",
int(jump_series.sum()),
ordered["site_id"].iloc[0] if "site_id" in ordered.columns and not ordered.empty else None,
ordered["depth_level"].iloc[0] if "depth_level" in ordered.columns and not ordered.empty else None,
)

return ordered.sort_index()


def _plot_single_site_level(ds, site_id, depth_level):
"""
Plot graph of one site_id and depth_level.

Param ds dataset of moisture data
Param site_id Index of site
Param depth_level Order of depth level
Returns plt object
"""
subset = ds.sel(site_id=site_id, depth_level=depth_level)
assert "moisture_filtered" in subset, "Expected 'moisture_filtered' variable in dataset."

fig, ax = plt.subplots(figsize=(10, 5))
subset["moisture"].plot.line(ax=ax, x="date_time", label="moisture")
subset["moisture_filtered"].plot.line(ax=ax, x="date_time", label="moisture_filtered")

ax.set_title("Moisture data of site: '" + str(site_id) + "', level: '" + str(depth_level) + "'")
ax.set_xlabel("Date")
ax.set_ylabel("Moisture")
ax.grid(True)
ax.legend()
return fig


def _plot_bokeh_site_level(ds, site_id, depth_level):
"""
Plot bokeh graph of one site_id and depth_level.

Param ds dataset of moisture data
Param site_id Index of site
Param depth_level Order of depth level
Returns plt object
"""
subset = ds.sel(site_id=site_id, depth_level=depth_level)
assert "moisture_filtered" in subset, "Expected 'moisture_filtered' variable in dataset."

from bokeh.models import ColumnDataSource
from bokeh.plotting import figure

df = subset[["moisture", "moisture_filtered"]].to_dataframe().reset_index()
source = ColumnDataSource(df)

fig = figure(
width=1000,
height=500,
x_axis_type="datetime",
title=f"Moisture data of site: '{site_id}', level: '{depth_level}'",
tools="pan,wheel_zoom,box_zoom,reset,save",
)
fig.line(
x="date_time",
y="moisture",
source=source,
legend_label="moisture",
line_width=2,
)
fig.line(
x="date_time",
y="moisture_filtered",
source=source,
legend_label="moisture_filtered",
line_width=2,
color="orange",
)

fig.xaxis.axis_label = "Date"
fig.yaxis.axis_label = "Moisture"
fig.grid.visible = True
fig.legend.location = "top_left"

return fig


def read_data(fparams: FilterParams, site_ids, depth_levels):
"""
Read moist data from zarr_fuse storage.

Param site_ids List of indexes of site
Param depth_levels List of orders of depth levels
Returns xArray.Dataset contains data of moisture dataset
"""
root_node = _open_zarr_schema()
water_level_node = root_node['Uhelna']['profiles']

ds = water_level_node.dataset
ds["moisture_filtered"] = xr.full_like(ds["moisture"], np.nan)

for site_id in site_ids:
for depth_level in depth_levels:
subset = ds.sel(site_id=site_id, depth_level=depth_level)
df = subset.to_dataframe().reset_index()

if df.empty:
continue

df_filtered = _filter_jumps(fparams, df)
filtered_values = xr.DataArray(
df_filtered["moisture_filtered"].to_numpy(),
dims=subset["moisture"].dims,
coords=subset["moisture"].coords,
)
ds["moisture_filtered"].loc[dict(site_id=site_id, depth_level=depth_level)] = filtered_values

return ds


def plot_filters(ds, site_ids, depth_levels, out_file):
"""
IN PROGRESS. Plot data of one side and depth level.

Param ds dataset of moisture data
Param site_ids List of indexes of site
Param depth_levels List of orders of depth levels
Param out_file Base name of output files
"""
from bokeh.io import output_file as bokeh_output_file, save as bokeh_save
from bokeh.layouts import column

workdir = _create_work_dir()
pdf_path = workdir / f"{out_file}.pdf"
html_path = workdir / f"{out_file}.html"
pdf = PdfPages(pdf_path)
bokeh_figures = []

for site_id in site_ids:
for depth_level in depth_levels:
fig = _plot_single_site_level(ds, site_id, depth_level)
pdf.savefig(fig, bbox_inches="tight")
plt.close(fig)
bokeh_figures.append(_plot_bokeh_site_level(ds, site_id, depth_level))

pdf.close()
if not bokeh_figures:
logger.warning("No plots generated for output '%s'.", out_file)
return

bokeh_output_file(str(html_path))
bokeh_save(column(*bokeh_figures))


def main():
final_df = read_data()
print(final_df)


if __name__ == "__main__":
main()
4 changes: 2 additions & 2 deletions hlavo/ingress/moist_profile/profile_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ ATTRS:
# for the bucket name see the store_create.sh script.
S3_ENDPOINT_URL: "https://s3.cl4.du.cesnet.cz"
# note: STORE_URL: "s3://<bucket>>/<zarr_storage>"
# STORE_URL: "s3://hlavo-release/profiles.zarr"
STORE_URL: "s3://hlavo-testing/profiles.zarr"
STORE_URL: "s3://hlavo-release/profiles.zarr"
# STORE_URL: "s3://hlavo-testing/profiles.zarr"
# STORE_URL: "file:///home/paulie/Workspace/HLAVO/tests/ingress/moist_profile/test_storage/"


Expand Down
19 changes: 19 additions & 0 deletions tests/ingress/moist_profile/test_data_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# author: David Flanderka

import sys
from hlavo.ingress.moist_profile import data_filter


def test_data_filter():
site_ids = [1, 2, 3]
depth_levels = [1, 2, 3, 4, 5]
filter_params = data_filter.FilterParams(1.4826, 6.0, 8.0)

final_ds = data_filter.read_data(filter_params, site_ids, depth_levels)
print(final_ds)
data_filter.plot_filters(final_ds, site_ids, depth_levels, "plot_all")

if __name__ == "__main__":
test_data_filter()
Loading