Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 45 additions & 34 deletions computing/drought/merge_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,30 @@ def merge_yearly_layers(
print(asset_suffix)
print(asset_folder_list)
print(f"merge yearly layers {app_type}")
# Create required GEE asset path components

ee_initialize(gee_account_id)
gee_obj = GEEAccount.objects.get(pk=gee_account_id)
helper_account_path = build_gee_helper_paths(app_type, gee_obj.helper_account.name)
# Create export asset path (must be constant for export)
description = f"drought_{asset_suffix}" # _{start_year}_{end_year}"
print(description)

description = f"drought_{asset_suffix}"

gee_asset = get_gee_dir_path(
asset_folder_list, asset_path=GEE_PATHS[app_type]["GEE_ASSET_PATH"]
)
asset_id = f"{gee_asset}{description}"

print(description)
print(asset_id)

# Check if asset already exists
if is_gee_asset_exists(asset_id):
ee.data.deleteAsset(asset_id)
# return None

def get_collection_path(year: int) -> str:
"""Get the full path for a year's collection."""
asset_path = GEE_PATHS[app_type]["GEE_ASSET_PATH"]
return f"{get_gee_dir_path(asset_folder_list, asset_path=asset_path)}drought_{asset_suffix}_{year}_v2"

# Get base feature collection
# Base collection
first_year_fc = ee.FeatureCollection(get_collection_path(start_year))

geometries_with_ids = first_year_fc.map(
lambda f: ee.Feature(
f.geometry(),
Expand All @@ -96,20 +95,25 @@ def get_collection_path(year: int) -> str:
)

def merge_year_data(feature):
"""Server-side function to merge data from all years."""
uid = feature.get("uid")

def process_year(prev_feature, year):
"""Process a single year's data."""
feat = ee.Feature(prev_feature)

# Get year's collection
year_fc = ee.FeatureCollection(get_collection_path(year))

filtered = year_fc.filter(ee.Filter.equals("uid", uid))

# ✅ SAFE feature
year_feature = ee.Feature(
year_fc.filter(ee.Filter.equals("uid", uid)).first()
ee.Algorithms.If(
filtered.size().gt(0), filtered.first(), ee.Feature(None, {})
)
)

# Base property names
# -------------------------
# Property lists
# -------------------------
base_props = [
"drought_labels_" + str(year),
"dryspell_length_" + str(year),
Expand All @@ -131,7 +135,6 @@ def process_year(prev_feature, year):
"total_weeks_" + str(year),
]

# Base property names
renamed_props = [
"drlb_" + str(year),
"drysp_" + str(year),
Expand All @@ -153,19 +156,23 @@ def process_year(prev_feature, year):
"t_wks_" + str(year),
]

prop_names = ee.List(base_props)

renamed_prop_names = ee.List(renamed_props)
# -------------------------
# SAFE getter
# -------------------------
def safe_get(prop):
return ee.Algorithms.If(
year_feature.get(prop), year_feature.get(prop), None
)

# Get property values
prop_values = prop_names.map(lambda x: year_feature.get(x))
prop_values = ee.List(base_props).map(lambda x: safe_get(x))

# Create base properties dictionary
base_dict = ee.Dictionary.fromLists(renamed_prop_names, prop_values)
base_dict = ee.Dictionary.fromLists(ee.List(renamed_props), prop_values)

all_properties = ee.Feature(year_feature).propertyNames()
# -------------------------
# Rainfall properties
# -------------------------
all_properties = year_feature.propertyNames()

# Filter properties that start with the prefix
orig_rainfall_names = all_properties.filter(
ee.Filter.stringStartsWith("item", "monthly_rainfall_deviation_")
)
Expand All @@ -175,19 +182,25 @@ def rename_property(prop):

renamed_rainfall_names = orig_rainfall_names.map(rename_property)

# Get rainfall values
rainfall_values = orig_rainfall_names.map(lambda x: year_feature.get(x))
rainfall_values = orig_rainfall_names.map(lambda x: safe_get(x))

# Create rainfall properties dictionary
rainfall_dict = ee.Dictionary.fromLists(
renamed_rainfall_names, rainfall_values
)

# Adding dry_spell length values from previous years with current year
total_dryspell = ee.Number(feat.get("avg_dryspell")).add(
ee.Number(year_feature.get("dryspell_length_" + str(year)))
# -------------------------
# SAFE dryspell
# -------------------------
dryspell_val = ee.Number(
ee.Algorithms.If(
year_feature.get("dryspell_length_" + str(year)),
year_feature.get("dryspell_length_" + str(year)),
0,
)
)

total_dryspell = ee.Number(feat.get("avg_dryspell")).add(dryspell_val)

return (
feat.set(base_dict)
.set(rainfall_dict)
Expand All @@ -196,21 +209,19 @@ def rename_property(prop):

feature = feature.set("avg_dryspell", 0)

# Process all years
years = range(start_year, end_year + 1)
processed_feature = reduce(process_year, years, feature)

num_years = ee.Number(end_year).subtract(start_year).add(1)

avg_dryspell = ee.Number(processed_feature.get("avg_dryspell")).divide(
num_years
)

# Set average dryspell and remove total
return processed_feature.set("avg_dryspell", avg_dryspell)

# Process all features
merged_fc = geometries_with_ids.map(merge_year_data)

# Export feature collection to GEE
task_id = export_vector_asset_to_gee(merged_fc, description, asset_id)

return task_id
88 changes: 64 additions & 24 deletions computing/lulc/lulc_v3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import time
from datetime import timedelta
import re
import ee
from dateutil.relativedelta import relativedelta

Expand All @@ -17,6 +18,7 @@
is_gee_asset_exists,
get_gee_dir_path,
gcs_file_exists,
delete_gcs_raster_files,
)
from nrm_app.celery import app
from .cropping_frequency import *
Expand All @@ -29,6 +31,30 @@
from computing.STAC_specs import generate_STAC_layerwise


def _parse_lulc_filename_for_years(final_output_filename: str):
"""
final_output_filename format (created in clip_lulc_v3):
<prefix>_<startYYYY>-<startMM>-<startDD>_<endYYYY>-<endMM>-<endDD>_LULCmap_<scale>

We use regex instead of split('_20') because the prefix may contain substrings like '_205'
which break naive year parsing (e.g. producing LULC_5_24_...).
"""
# Capture prefix + the two YYYY values around the date segments.
m = re.match(
r"^(?P<prefix>.*)_(?P<start>\d{4})-\d{2}-\d{2}_(?P<end>\d{4})-\d{2}-\d{2}_",
final_output_filename,
)
if not m:
raise ValueError(
f"Unexpected LULC filename format: '{final_output_filename}'. "
"Expected <prefix>_<YYYY-MM-DD>_<YYYY-MM-DD>_LULCmap_<scale>."
)
prefix = m.group("prefix")
s_year = m.group("start")[2:]
e_year = m.group("end")[2:]
return prefix, s_year, e_year


@app.task(bind=True)
def clip_lulc_v3(
self,
Expand All @@ -42,6 +68,7 @@ def clip_lulc_v3(
asset_folder=None,
asset_suffix=None,
app_type="MWS",
force_regenerate=False,
):
ee_initialize(gee_account_id)
print("Inside lulc_river_basin")
Expand Down Expand Up @@ -76,16 +103,17 @@ def clip_lulc_v3(
final_output_assetid_array_new = []

layer_obj = None
try:
layer_obj = get_layer_object(
state,
district,
block,
layer_name=f"LULC_17_18_{filename_prefix}_level_3",
dataset_name="LULC_level_3",
)
except Exception as e:
print("DB layer not found for lulc.")
if not force_regenerate:
try:
layer_obj = get_layer_object(
state,
district,
block,
layer_name=f"LULC_17_18_{filename_prefix}_level_3",
dataset_name="LULC_level_3",
)
except Exception as e:
print("DB layer not found for lulc.")

new_loop_start = None
if layer_obj:
Expand Down Expand Up @@ -124,13 +152,21 @@ def clip_lulc_v3(
pan_india = ee.Image(
f"projects/corestack-datasets/assets/datasets/LULC_v3_river_basin/pan_india_lulc_v3_{curr_start_year}_{curr_end_year}"
)
# clipToCollection expects a FeatureCollection; we have geometry here.
clipped_lulc = pan_india.clip(roi.geometry())
l1_asset_new.append(clipped_lulc)

task_list = []
geometry = roi.geometry()
if not is_gee_asset_exists(final_output_assetid_array_new[len(l1_asset_new) - 1]):
if force_regenerate or not is_gee_asset_exists(
final_output_assetid_array_new[len(l1_asset_new) - 1]
):
for i in range(0, len(l1_asset_new)):
if force_regenerate and is_gee_asset_exists(final_output_assetid_array_new[i]):
try:
ee.data.deleteAsset(final_output_assetid_array_new[i])
except Exception as e:
print("Error deleting existing LULC asset during force regenerate:", e)
if (
is_gee_asset_exists(final_output_assetid_array_new[i])
and len(l1_asset_new) <= 2
Expand Down Expand Up @@ -191,6 +227,7 @@ def clip_lulc_v3(
final_output_filename_array_new,
final_output_assetid_array_new,
scale,
force_regenerate=force_regenerate,
)

layer_at_geoserver = sync_lulc_to_geoserver(
Expand All @@ -206,17 +243,22 @@ def clip_lulc_v3(


def sync_lulc_to_gcs(
final_output_filename_array_new, final_output_assetid_array_new, scale
final_output_filename_array_new,
final_output_assetid_array_new,
scale,
force_regenerate=False,
):
task_ids = []
for i in range(0, len(final_output_assetid_array_new)):
make_asset_public(final_output_assetid_array_new[i])
image = ee.Image(final_output_assetid_array_new[i])
name_arr = final_output_filename_array_new[i].split("_20")
s_year = name_arr[1][:2]
e_year = name_arr[2][:2]
layer_name = "LULC_" + s_year + "_" + e_year + "_" + name_arr[0]
if not gcs_file_exists(layer_name):
prefix, s_year, e_year = _parse_lulc_filename_for_years(
final_output_filename_array_new[i]
)
layer_name = "LULC_" + s_year + "_" + e_year + "_" + prefix
if force_regenerate:
delete_gcs_raster_files(layer_name)
if force_regenerate or not gcs_file_exists(layer_name):
task_ids.append(sync_raster_to_gcs(image, scale, layer_name))

task_id_list = check_task_status(task_ids)
Expand All @@ -235,12 +277,10 @@ def sync_lulc_to_geoserver(
lulc_workspaces = ["LULC_level_1", "LULC_level_2", "LULC_level_3"]
layer_at_geoserver = False
for i in range(0, len(final_output_filename_array_new)):
name_arr = final_output_filename_array_new[i].split(
"_20"
) # TODO: better logic than this
s_year = name_arr[1][:2]
e_year = name_arr[2][:2]
gcs_file_name = "LULC_" + s_year + "_" + e_year + "_" + name_arr[0]
prefix, s_year, e_year = _parse_lulc_filename_for_years(
final_output_filename_array_new[i]
)
gcs_file_name = "LULC_" + s_year + "_" + e_year + "_" + prefix
print("Syncing " + gcs_file_name + " to geoserver")
for workspace in lulc_workspaces:
suff = workspace.replace("LULC", "")
Expand All @@ -265,7 +305,7 @@ def sync_lulc_to_geoserver(
)
if res and layer_ids:
update_layer_sync_status(layer_id=layer_ids[i], sync_to_geoserver=True)
print("STAC: Name array check", name_arr[1])
print("STAC: Parsed start year", s_year)

if workspace == "LULC_level_3":
start_year_STAC = "20" + str(
Expand Down
Loading