From 203df6932b8b7e61a2489c6a22267a522ea7d3b2 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 19 Dec 2022 16:50:50 -0600 Subject: [PATCH 01/13] Add test script to process nwm forcing into ngen input --- ngen_forcing/process_nwm_forcing.py | 68 +++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 ngen_forcing/process_nwm_forcing.py diff --git a/ngen_forcing/process_nwm_forcing.py b/ngen_forcing/process_nwm_forcing.py new file mode 100644 index 0000000..a9e1ec8 --- /dev/null +++ b/ngen_forcing/process_nwm_forcing.py @@ -0,0 +1,68 @@ +# import rioxarray as rxr +import xarray as xr +import geopandas as gpd +from rasterstats import zonal_stats +# import rasterio +import pandas as pd + +import warnings +warnings.simplefilter("ignore") + +# Read forcing files +# Generate List of files + +# TODO: Add looping through lists of forcing files +# consider looking at the "listofnwmfilenames.py" in the data_access_examples repository. +# Integer values for runinput, varinput, etc. are listed at the top of the file +# and an example is given in the `main` function. + +# import listofnwmfilenames +# create_file_list( +# runinput, +# varinput, +# geoinput, +# meminput, +# start_date, +# end_date, +# fcst_cycle, +# ) + +#for file in list_of_files: +fc_nc = ("nwm.t00z.medium_range.forcing.f001.conus.nc") +fc_xds = xr.open_dataset(fc_nc) + +reng = "rasterio" +fc_xds = xr.open_dataset(fc_nc, engine=reng) + +# Read basin boundary file +f_03 = "03w/nextgen_03W.gpkg" + +gpkg_03w_divides = gpd.read_file(f_03, layer="divides") + +list_of_vars = [ + "U2D" , + "V2D" , + "LWDOWN" , + "RAINRATE", + "T2D" , + "Q2D" , + "PSFC" , + "SWDOWN", +] + +df_dict = {} +for _v in list_of_vars: + with xr.open_dataset(fc_nc, engine=reng) as _xds: + _src = _xds[_v] + _aff2 = _src.rio.transform() + _arr2 = _src.values[0] + _df_zonal_stats = pd.DataFrame(zonal_stats(gpkg_03w_divides, _arr2, affine=_aff2)) + + df_dict[_v] = pd.DataFrame(index=_df_zonal_stats.index) + # adding statistics back to original GeoDataFrame + # gdf3 = pd.concat([gpkg_03w_divides, _df_zonal_stats], axis=1) + df_dict[_v][fc_xds.time.values[0]]=_df_zonal_stats["mean"] + +# TODO: Convert the output to CSV with something like +# `gdf3.to_csv` + From 7d1b50d45d9ec7f41cc94bd87f7c7be80e91da03 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Tue, 20 Dec 2022 12:11:54 -0600 Subject: [PATCH 02/13] rewrite to include file-loop --- ngen_forcing/process_nwm_forcing.py | 52 +++++++++++++++++++---------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/ngen_forcing/process_nwm_forcing.py b/ngen_forcing/process_nwm_forcing.py index a9e1ec8..d3a3389 100644 --- a/ngen_forcing/process_nwm_forcing.py +++ b/ngen_forcing/process_nwm_forcing.py @@ -5,6 +5,7 @@ # import rasterio import pandas as pd +from pathlib import Path import warnings warnings.simplefilter("ignore") @@ -27,18 +28,23 @@ # fcst_cycle, # ) -#for file in list_of_files: -fc_nc = ("nwm.t00z.medium_range.forcing.f001.conus.nc") -fc_xds = xr.open_dataset(fc_nc) +""" +A set of test files can be generated downloading these files +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f001.conus.nc +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f002.conus.nc +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f003.conus.nc +""" -reng = "rasterio" -fc_xds = xr.open_dataset(fc_nc, engine=reng) +folder_prefix = Path('data') +list_of_files = [ + "nwm.t12z.medium_range.forcing.f001.conus.nc", + "nwm.t12z.medium_range.forcing.f002.conus.nc", + "nwm.t12z.medium_range.forcing.f003.conus.nc", +] # Read basin boundary file f_03 = "03w/nextgen_03W.gpkg" - -gpkg_03w_divides = gpd.read_file(f_03, layer="divides") - +gpkg_divides = gpd.read_file(f_03, layer="divides") list_of_vars = [ "U2D" , "V2D" , @@ -52,16 +58,26 @@ df_dict = {} for _v in list_of_vars: - with xr.open_dataset(fc_nc, engine=reng) as _xds: - _src = _xds[_v] - _aff2 = _src.rio.transform() - _arr2 = _src.values[0] - _df_zonal_stats = pd.DataFrame(zonal_stats(gpkg_03w_divides, _arr2, affine=_aff2)) - - df_dict[_v] = pd.DataFrame(index=_df_zonal_stats.index) - # adding statistics back to original GeoDataFrame - # gdf3 = pd.concat([gpkg_03w_divides, _df_zonal_stats], axis=1) - df_dict[_v][fc_xds.time.values[0]]=_df_zonal_stats["mean"] + df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) + +reng = "rasterio" +sum_stat = "mean" + +for _nc_file in list_of_files: + # _nc_file = ("nwm.t00z.medium_range.forcing.f001.conus.nc") + _full_nc_file = (folder_prefix.joinpath(_nc_file)) + + with xr.open_dataset(_full_nc_file, engine=reng) as _xds: + for _v in list_of_vars: + _src = _xds[_v] + _aff2 = _src.rio.transform() + _arr2 = _src.values[0] + + breakpoint() + _df_zonal_stats = pd.DataFrame(zonal_stats(gpkg_divides, _arr2, affine=_aff2)) + # if adding statistics back to original GeoDataFrame + # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) + df_dict[_v][_xds.time.values[0]]=_df_zonal_stats[sum_stat] # TODO: Convert the output to CSV with something like # `gdf3.to_csv` From 3cf0421058c99ea3109f1b4fb7d07f15b75dcfc2 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Tue, 20 Dec 2022 12:16:08 -0600 Subject: [PATCH 03/13] include gpkg download in example --- ngen_forcing/process_nwm_forcing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ngen_forcing/process_nwm_forcing.py b/ngen_forcing/process_nwm_forcing.py index d3a3389..d7d86ae 100644 --- a/ngen_forcing/process_nwm_forcing.py +++ b/ngen_forcing/process_nwm_forcing.py @@ -33,6 +33,7 @@ wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f001.conus.nc wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f002.conus.nc wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f003.conus.nc +wget -P 03w -c https://nextgen-hydrofabric.s3.amazonaws.com/v1.2/nextgen_03W.gpkg """ folder_prefix = Path('data') From a7509f5df3d1b5863a5cfeb89469e6c0b94d9f76 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Thu, 22 Dec 2022 12:17:22 -0600 Subject: [PATCH 04/13] add optimized loop and port to functions --- .../TestConvert_NWMForcing_to_Ngen.py | 178 ++++++++++++++++++ ngen_forcing/defs.py | 18 ++ ngen_forcing/process_nwm_forcing.py | 167 +++++++++++----- 3 files changed, 319 insertions(+), 44 deletions(-) create mode 100644 ngen_forcing/TestConvert_NWMForcing_to_Ngen.py create mode 100644 ngen_forcing/defs.py diff --git a/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py b/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py new file mode 100644 index 0000000..8f4158f --- /dev/null +++ b/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py @@ -0,0 +1,178 @@ +from defs import xr_read_window, polymask +from rasterio import _io, windows +import concurrent.futures +import xarray as xr + + +class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin): + pass + + +def junk(): + + flist = gpkg_divides.geometry.to_list() + polys = flist + + +def get_forcing_dict_newway( + feature_list, + folder_prefix, + file_list, +): + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + stats = [] + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + for f in filehandles: + cropped = xr_read_window(f, winslices, mask=mask) + stats.append(cropped.mean()) + [f.close() for f in filehandles] # Returns None for each file + + return stats + + +def get_forcing_dict_newway_parallel( + feature_list, + folder_prefix, + file_list, + ): + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + + with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: + # with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + stats = [] + future_list = [] + + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + for f in filehandles: + future = executor.submit(xr_read_window, f, winslices, mask=mask) + # cropped = xr_read_window(f, winslices, mask=mask) + # stats.append(cropped.mean()) + future_list.append(future) + for _f in concurrent.futures.as_completed(future_list): + stats.append(_f.result().mean()) + + [f.close() for f in filehandles] + return stats + + +def get_forcing_dict_newway_inverted( + feature_list, + folder_prefix, + file_list, +): + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + stats = [] + future_list = [] + mask_win_list = [] + + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + mask_win_list.append((mask, winslices)) + + for f in filehandles: + print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + for _m, _w in mask_win_list: + cropped = xr_read_window(f, _w, mask=_m) + stats.append(cropped.mean()) + + [f.close() for f in filehandles] + return stats + + +def get_forcing_dict_newway_inverted_parallel( + feature_list, + folder_prefix, + file_list, + ): + + import concurrent.futures + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + mask_win_list = [] + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + mask_win_list.append((mask, winslices)) + + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + stats = [] + future_list = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: + # with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + + for f in filehandles: + print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + for _m, _w in mask_win_list: + future = executor.submit(xr_read_window, f, _w, mask=_m) + # cropped = xr_read_window(f, _w, mask=_m) + # stats.append(cropped.mean()) + future_list.append(future) + for _f in concurrent.futures.as_completed(future_list): + stats.append(_f.result().mean()) + + [f.close() for f in filehandles] + return stats diff --git a/ngen_forcing/defs.py b/ngen_forcing/defs.py new file mode 100644 index 0000000..58b2c1d --- /dev/null +++ b/ngen_forcing/defs.py @@ -0,0 +1,18 @@ +import rasterio.mask as riomask + + +def polymask(dataset, invert=False, all_touched=False): + def _polymask(poly): + return riomask.raster_geometry_mask( + dataset, [poly], invert=invert, all_touched=all_touched, crop=True + ) + + return _polymask + + +def xr_read_window(ds, window, mask=None): + data = ds.isel(window) + if mask is None: + return data + else: + return data.where(mask) diff --git a/ngen_forcing/process_nwm_forcing.py b/ngen_forcing/process_nwm_forcing.py index d7d86ae..20a2300 100644 --- a/ngen_forcing/process_nwm_forcing.py +++ b/ngen_forcing/process_nwm_forcing.py @@ -2,11 +2,22 @@ import xarray as xr import geopandas as gpd from rasterstats import zonal_stats + # import rasterio import pandas as pd +import time + +from TestConvert_NWMForcing_to_Ngen import ( + get_forcing_dict_newway, + get_forcing_dict_newway_parallel, + get_forcing_dict_newway_inverted, + get_forcing_dict_newway_inverted_parallel, +) + from pathlib import Path import warnings + warnings.simplefilter("ignore") # Read forcing files @@ -15,7 +26,7 @@ # TODO: Add looping through lists of forcing files # consider looking at the "listofnwmfilenames.py" in the data_access_examples repository. # Integer values for runinput, varinput, etc. are listed at the top of the file -# and an example is given in the `main` function. +# and an example is given in the `main` function. # import listofnwmfilenames # create_file_list( @@ -36,50 +47,118 @@ wget -P 03w -c https://nextgen-hydrofabric.s3.amazonaws.com/v1.2/nextgen_03W.gpkg """ -folder_prefix = Path('data') -list_of_files = [ - "nwm.t12z.medium_range.forcing.f001.conus.nc", - "nwm.t12z.medium_range.forcing.f002.conus.nc", - "nwm.t12z.medium_range.forcing.f003.conus.nc", -] - -# Read basin boundary file -f_03 = "03w/nextgen_03W.gpkg" -gpkg_divides = gpd.read_file(f_03, layer="divides") -list_of_vars = [ - "U2D" , - "V2D" , - "LWDOWN" , - "RAINRATE", - "T2D" , - "Q2D" , - "PSFC" , - "SWDOWN", -] - -df_dict = {} -for _v in list_of_vars: - df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) - -reng = "rasterio" -sum_stat = "mean" - -for _nc_file in list_of_files: - # _nc_file = ("nwm.t00z.medium_range.forcing.f001.conus.nc") - _full_nc_file = (folder_prefix.joinpath(_nc_file)) - - with xr.open_dataset(_full_nc_file, engine=reng) as _xds: - for _v in list_of_vars: - _src = _xds[_v] - _aff2 = _src.rio.transform() - _arr2 = _src.values[0] - - breakpoint() - _df_zonal_stats = pd.DataFrame(zonal_stats(gpkg_divides, _arr2, affine=_aff2)) - # if adding statistics back to original GeoDataFrame - # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) - df_dict[_v][_xds.time.values[0]]=_df_zonal_stats[sum_stat] + +def get_forcing_dict( + gpkg_divides, + folder_prefix, + filelist, + var_list, +): + reng = "rasterio" + sum_stat = "mean" + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) + + for _nc_file in filelist: + # _nc_file = ("nwm.t00z.medium_range.forcing.f001.conus.nc") + _full_nc_file = folder_prefix.joinpath(_nc_file) + + with xr.open_dataset(_full_nc_file, engine=reng) as _xds: + for _v in var_list: + _src = _xds[_v] + _aff2 = _src.rio.transform() + _arr2 = _src.values[0] + + _df_zonal_stats = pd.DataFrame( + zonal_stats(gpkg_divides, _arr2, affine=_aff2) + ) + # if adding statistics back to original GeoDataFrame + # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) + df_dict[_v][_xds.time.values[0]] = _df_zonal_stats[sum_stat] + + return df_dict + # TODO: Convert the output to CSV with something like # `gdf3.to_csv` + +def main(): + + folder_prefix = Path("data") + list_of_files = [ + f"nwm.t12z.medium_range.forcing.f{_r:03}.conus.nc" for _r in range(1, 241) + ] + + # Read basin boundary file + f_03 = "03w/nextgen_03W.gpkg" + gpkg_divides = gpd.read_file(f_03, layer="divides") + var_list = [ + "U2D", + "V2D", + "LWDOWN", + "RAINRATE", + "T2D", + "Q2D", + "PSFC", + "SWDOWN", + ] + + file_list = list_of_files[0:30] + gpkg_subset = gpkg_divides[0:2000] + #file_list = list_of_files[0:3] + #gpkg_subset = gpkg_divides[0:20] + feature_list = gpkg_subset.geometry.to_list() + + # start_time = time.time() + # print(f"Working on the original way") + # forcing_dict = get_forcing_dict( + # gpkg_subset, + # folder_prefix, + # file_list, + # var_list, + # ) + # print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way") + fd2 = get_forcing_dict_newway( + feature_list, + folder_prefix, + file_list, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with threading. (It's not much better.)") + fd3 = get_forcing_dict_newway_parallel( + feature_list, + folder_prefix, + file_list, + ) + print(time.time() - start_time) + + + start_time = time.time() + print(f"Working on the new way with loops reversed.") + fd4 = get_forcing_dict_newway_inverted( + feature_list, + folder_prefix, + file_list, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with loops reversed with threading.") + fd4 = get_forcing_dict_newway_inverted_parallel( + feature_list, + folder_prefix, + file_list, + ) + print(time.time() - start_time) + + +if __name__ == "__main__": + main() From 3d4bf39cd84e5055ad5356329da0f17a4db4e347 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Thu, 22 Dec 2022 12:51:18 -0600 Subject: [PATCH 05/13] add parallel options --- .../TestConvert_NWMForcing_to_Ngen.py | 40 +++++++++++++------ ngen_forcing/process_nwm_forcing.py | 39 ++++++++++++++---- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py b/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py index 8f4158f..2a34f9a 100644 --- a/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py +++ b/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py @@ -48,10 +48,12 @@ def get_forcing_dict_newway( def get_forcing_dict_newway_parallel( - feature_list, - folder_prefix, - file_list, - ): + feature_list, + folder_prefix, + file_list, + para="thread", + para_n=2, +): reng = "rasterio" _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) @@ -67,8 +69,14 @@ def get_forcing_dict_newway_parallel( ) filehandles = [xr.open_dataset("data/" + f) for f in file_list] - with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: - # with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + if para == "process": + pool = concurrent.futures.ProcessPoolExecutor + elif para == "thread": + pool = concurrent.futures.ThreadPoolExecutor + else: + pool = concurrent.futures.ThreadPoolExecutor + + with pool(max_workers=para_n) as executor: stats = [] future_list = [] @@ -130,10 +138,12 @@ def get_forcing_dict_newway_inverted( def get_forcing_dict_newway_inverted_parallel( - feature_list, - folder_prefix, - file_list, - ): + feature_list, + folder_prefix, + file_list, + para="thread", + para_n=2, +): import concurrent.futures @@ -161,8 +171,14 @@ def get_forcing_dict_newway_inverted_parallel( stats = [] future_list = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: - # with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + if para == "process": + pool = concurrent.futures.ProcessPoolExecutor + elif para == "thread": + pool = concurrent.futures.ThreadPoolExecutor + else: + pool = concurrent.futures.ThreadPoolExecutor + + with pool(max_workers=para_n) as executor: for f in filehandles: print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") diff --git a/ngen_forcing/process_nwm_forcing.py b/ngen_forcing/process_nwm_forcing.py index 20a2300..5e1430a 100644 --- a/ngen_forcing/process_nwm_forcing.py +++ b/ngen_forcing/process_nwm_forcing.py @@ -106,10 +106,10 @@ def main(): "SWDOWN", ] - file_list = list_of_files[0:30] - gpkg_subset = gpkg_divides[0:2000] - #file_list = list_of_files[0:3] - #gpkg_subset = gpkg_divides[0:20] + # file_list = list_of_files[0:30] + # gpkg_subset = gpkg_divides[0:2000] + file_list = list_of_files[0:3] + gpkg_subset = gpkg_divides[0:200] feature_list = gpkg_subset.geometry.to_list() # start_time = time.time() @@ -132,14 +132,26 @@ def main(): print(time.time() - start_time) start_time = time.time() - print(f"Working on the new way with threading. (It's not much better.)") + print(f"Working on the new way with threading parallel.") fd3 = get_forcing_dict_newway_parallel( feature_list, folder_prefix, file_list, - ) + para="thread", + para_n=16, + ) print(time.time() - start_time) + start_time = time.time() + print(f"Working on the new way with process parallel.") + fd3 = get_forcing_dict_newway_parallel( + feature_list, + folder_prefix, + file_list, + para="process", + para_n=16, + ) + print(time.time() - start_time) start_time = time.time() print(f"Working on the new way with loops reversed.") @@ -151,11 +163,24 @@ def main(): print(time.time() - start_time) start_time = time.time() - print(f"Working on the new way with loops reversed with threading.") + print(f"Working on the new way with loops reversed with threading parallel.") + fd4 = get_forcing_dict_newway_inverted_parallel( + feature_list, + folder_prefix, + file_list, + para="thread", + para_n=16, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with loops reversed with process parallel.") fd4 = get_forcing_dict_newway_inverted_parallel( feature_list, folder_prefix, file_list, + para="process", + para_n=16, ) print(time.time() - start_time) From 71c83701020b143ed14c344dbec730e3e4054910 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 2 Jan 2023 14:45:09 -0600 Subject: [PATCH 06/13] explain commented test block --- ngen_forcing/process_nwm_forcing.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ngen_forcing/process_nwm_forcing.py b/ngen_forcing/process_nwm_forcing.py index 5e1430a..2232691 100644 --- a/ngen_forcing/process_nwm_forcing.py +++ b/ngen_forcing/process_nwm_forcing.py @@ -112,6 +112,8 @@ def main(): gpkg_subset = gpkg_divides[0:200] feature_list = gpkg_subset.geometry.to_list() + # This way is extremely slow for anything more than a + # few files, so we comment it out of the test # start_time = time.time() # print(f"Working on the original way") # forcing_dict = get_forcing_dict( From 1b8efe8db9f65be865c53ec146066667a4d4962c Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 2 Jan 2023 14:57:45 -0600 Subject: [PATCH 07/13] better file names for process script --- ...ert_NWMForcing_to_Ngen.py => process_nwm_forcing_to_ngen.py} | 0 ...ocess_nwm_forcing.py => test_process_nwm_forcing_to_ngen.py} | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename ngen_forcing/{TestConvert_NWMForcing_to_Ngen.py => process_nwm_forcing_to_ngen.py} (100%) rename ngen_forcing/{process_nwm_forcing.py => test_process_nwm_forcing_to_ngen.py} (99%) diff --git a/ngen_forcing/TestConvert_NWMForcing_to_Ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py similarity index 100% rename from ngen_forcing/TestConvert_NWMForcing_to_Ngen.py rename to ngen_forcing/process_nwm_forcing_to_ngen.py diff --git a/ngen_forcing/process_nwm_forcing.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py similarity index 99% rename from ngen_forcing/process_nwm_forcing.py rename to ngen_forcing/test_process_nwm_forcing_to_ngen.py index 2232691..5850bfb 100644 --- a/ngen_forcing/process_nwm_forcing.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -8,7 +8,7 @@ import time -from TestConvert_NWMForcing_to_Ngen import ( +from process_nwm_forcing_to_ngen import ( get_forcing_dict_newway, get_forcing_dict_newway_parallel, get_forcing_dict_newway_inverted, From 47a2f8fc4337c62bb40fe855152b2e4e232b752e Mon Sep 17 00:00:00 2001 From: James Halgren Date: Fri, 27 Jan 2023 13:48:06 -0600 Subject: [PATCH 08/13] turn on slow test in committed file --- .../test_process_nwm_forcing_to_ngen.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index 5850bfb..a7d1404 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -114,15 +114,15 @@ def main(): # This way is extremely slow for anything more than a # few files, so we comment it out of the test - # start_time = time.time() - # print(f"Working on the original way") - # forcing_dict = get_forcing_dict( - # gpkg_subset, - # folder_prefix, - # file_list, - # var_list, - # ) - # print(time.time() - start_time) + start_time = time.time() + print(f"Working on the old (slow) way") + fd1 = get_forcing_dict( + gpkg_subset, + folder_prefix, + file_list, + var_list, + ) + print(time.time() - start_time) start_time = time.time() print(f"Working on the new way") From 2dc42b5f6053ed0a91ebdfaeee0b303a9979ec0f Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 30 Jan 2023 11:55:36 -0600 Subject: [PATCH 09/13] hold file handles for slow method --- ngen_forcing/test_process_nwm_forcing_to_ngen.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index a7d1404..5fe24a3 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -61,11 +61,16 @@ def get_forcing_dict( for _v in var_list: df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) + ds_list = [] for _nc_file in filelist: # _nc_file = ("nwm.t00z.medium_range.forcing.f001.conus.nc") _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) - with xr.open_dataset(_full_nc_file, engine=reng) as _xds: + for _i, _nc_file in enumerate(filelist): + _xds = ds_list[_i] + print(f"{_i}, {round(_i/len(filelist), 5)*100}".ljust(40), end="\r") + if 1 == 1: for _v in var_list: _src = _xds[_v] _aff2 = _src.rio.transform() @@ -78,6 +83,8 @@ def get_forcing_dict( # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) df_dict[_v][_xds.time.values[0]] = _df_zonal_stats[sum_stat] + [_xds.close() for _xds in ds_list] + return df_dict From 889df5d482c28a8e25994fdb9bb8d96bd59a8d67 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Fri, 24 Mar 2023 10:24:46 -0500 Subject: [PATCH 10/13] fix duplicate variable names in test --- .../test_process_nwm_forcing_to_ngen.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index 5fe24a3..b068ac1 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -49,7 +49,8 @@ def get_forcing_dict( - gpkg_divides, + feature_index, + feature_list, folder_prefix, filelist, var_list, @@ -59,7 +60,7 @@ def get_forcing_dict( df_dict = {} for _v in var_list: - df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) + df_dict[_v] = pd.DataFrame(index=feature_index) ds_list = [] for _nc_file in filelist: @@ -77,7 +78,7 @@ def get_forcing_dict( _arr2 = _src.values[0] _df_zonal_stats = pd.DataFrame( - zonal_stats(gpkg_divides, _arr2, affine=_aff2) + zonal_stats(feature_list, _arr2, affine=_aff2) ) # if adding statistics back to original GeoDataFrame # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) @@ -121,10 +122,12 @@ def main(): # This way is extremely slow for anything more than a # few files, so we comment it out of the test + start_time = time.time() print(f"Working on the old (slow) way") fd1 = get_forcing_dict( - gpkg_subset, + gpkg_subset.index, + feature_list, folder_prefix, file_list, var_list, @@ -142,7 +145,7 @@ def main(): start_time = time.time() print(f"Working on the new way with threading parallel.") - fd3 = get_forcing_dict_newway_parallel( + fd3t = get_forcing_dict_newway_parallel( feature_list, folder_prefix, file_list, @@ -153,7 +156,7 @@ def main(): start_time = time.time() print(f"Working on the new way with process parallel.") - fd3 = get_forcing_dict_newway_parallel( + fd3p = get_forcing_dict_newway_parallel( feature_list, folder_prefix, file_list, @@ -173,7 +176,7 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with threading parallel.") - fd4 = get_forcing_dict_newway_inverted_parallel( + fd5t = get_forcing_dict_newway_inverted_parallel( feature_list, folder_prefix, file_list, @@ -184,7 +187,7 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with process parallel.") - fd4 = get_forcing_dict_newway_inverted_parallel( + fd5p = get_forcing_dict_newway_inverted_parallel( feature_list, folder_prefix, file_list, From b6a49f73dbf576dcdf3fdc6cefa9c983e03a4a86 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 27 Mar 2023 16:28:17 -0500 Subject: [PATCH 11/13] update single-thread implementations --- ngen_forcing/defs.py | 8 ++ ngen_forcing/process_nwm_forcing_to_ngen.py | 77 +++++++++++++------ .../test_process_nwm_forcing_to_ngen.py | 4 + 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/ngen_forcing/defs.py b/ngen_forcing/defs.py index 58b2c1d..18175c8 100644 --- a/ngen_forcing/defs.py +++ b/ngen_forcing/defs.py @@ -16,3 +16,11 @@ def xr_read_window(ds, window, mask=None): return data else: return data.where(mask) + + +def xr_read_window_time(ds, window, mask=None, idx=None, time=None): + data = ds.isel(window) + if mask is None: + return idx, time, data + else: + return idx, time, data.where(mask) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py index 2a34f9a..f607e47 100644 --- a/ngen_forcing/process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -2,6 +2,7 @@ from rasterio import _io, windows import concurrent.futures import xarray as xr +import pandas as pd class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin): @@ -15,36 +16,49 @@ def junk(): def get_forcing_dict_newway( + feature_index, feature_list, folder_prefix, file_list, + var_list, ): reng = "rasterio" - _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) - _template_arr = _xds.U2D.values + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values _u2d = MemoryDataset( _template_arr, - transform=_xds.U2D.rio.transform(), + transform=_xds_dummy.U2D.rio.transform(), gcps=None, rpcs=None, crs=None, copy=False, ) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] - stats = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for i, feature in enumerate(feature_list): print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + mask, _, window = polymask(_u2d)(feature) mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) - for f in filehandles: - cropped = xr_read_window(f, winslices, mask=mask) - stats.append(cropped.mean()) - [f.close() for f in filehandles] # Returns None for each file + for j, _xds in enumerate(ds_list): + time_value = _xds.time.values[0] + cropped = xr_read_window(_xds, winslices, mask=mask) + stats = cropped.mean() + for var in var_list: + df_dict[var].loc[i, time_value] = stats[var] - return stats + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_parallel( @@ -98,43 +112,58 @@ def get_forcing_dict_newway_parallel( def get_forcing_dict_newway_inverted( + feature_index, feature_list, folder_prefix, file_list, + var_list, ): reng = "rasterio" - _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) - _template_arr = _xds.U2D.values + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values _u2d = MemoryDataset( _template_arr, - transform=_xds.U2D.rio.transform(), + transform=_xds_dummy.U2D.rio.transform(), gcps=None, rpcs=None, crs=None, copy=False, ) + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] stats = [] - future_list = [] mask_win_list = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): + for i, feature in enumerate(feature_list): print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + mask, _, window = polymask(_u2d)(feature) mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) mask_win_list.append((mask, winslices)) - for f in filehandles: + for i, f in enumerate(ds_list): print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") - for _m, _w in mask_win_list: + time_value = f.time.values[0] + # TODO: when we read the window, could the time be added as a dimension? + for j, (_m, _w) in enumerate(mask_win_list): cropped = xr_read_window(f, _w, mask=_m) - stats.append(cropped.mean()) + stats.append((j, time_value, cropped.mean())) - [f.close() for f in filehandles] - return stats + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var] + + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_inverted_parallel( diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index b068ac1..b7717d0 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -137,9 +137,11 @@ def main(): start_time = time.time() print(f"Working on the new way") fd2 = get_forcing_dict_newway( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, ) print(time.time() - start_time) @@ -168,9 +170,11 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed.") fd4 = get_forcing_dict_newway_inverted( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, ) print(time.time() - start_time) From 0fd7d8b3a87fa7e1aeadb6b480add42df914c945 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 27 Mar 2023 16:28:43 -0500 Subject: [PATCH 12/13] update parallel implementations --- ngen_forcing/process_nwm_forcing_to_ngen.py | 82 +++++++++++++------ .../test_process_nwm_forcing_to_ngen.py | 8 ++ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py index f607e47..ec299b5 100644 --- a/ngen_forcing/process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -1,6 +1,5 @@ -from defs import xr_read_window, polymask +from defs import xr_read_window, polymask, xr_read_window_time from rasterio import _io, windows -import concurrent.futures import xarray as xr import pandas as pd @@ -62,17 +61,20 @@ def get_forcing_dict_newway( def get_forcing_dict_newway_parallel( + feature_index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=2, ): + import concurrent.futures + reng = "rasterio" _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) _template_arr = _xds.U2D.values - _u2d = MemoryDataset( _template_arr, transform=_xds.U2D.rio.transform(), @@ -81,7 +83,10 @@ def get_forcing_dict_newway_parallel( crs=None, copy=False, ) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result if para == "process": pool = concurrent.futures.ProcessPoolExecutor @@ -90,25 +95,38 @@ def get_forcing_dict_newway_parallel( else: pool = concurrent.futures.ThreadPoolExecutor + stats = [] + future_list = [] + with pool(max_workers=para_n) as executor: - stats = [] - future_list = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): - print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + for _i, _m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{_i}, {round(_i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = _m mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) - for f in filehandles: - future = executor.submit(xr_read_window, f, winslices, mask=mask) + for ds in ds_list: + _t = ds.time.values[0] + future = executor.submit( + xr_read_window_time, ds, winslices, mask=mask, idx=_i, time=_t + ) # cropped = xr_read_window(f, winslices, mask=mask) # stats.append(cropped.mean()) future_list.append(future) for _f in concurrent.futures.as_completed(future_list): - stats.append(_f.result().mean()) + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) - [f.close() for f in filehandles] - return stats + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() + + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_inverted( @@ -167,9 +185,11 @@ def get_forcing_dict_newway_inverted( def get_forcing_dict_newway_inverted_parallel( + feature_index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=2, ): @@ -196,7 +216,11 @@ def get_forcing_dict_newway_inverted_parallel( winslices = dict(zip(["y", "x"], window.toslices())) mask_win_list.append((mask, winslices)) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result + stats = [] future_list = [] @@ -209,15 +233,27 @@ def get_forcing_dict_newway_inverted_parallel( with pool(max_workers=para_n) as executor: - for f in filehandles: - print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") - for _m, _w in mask_win_list: - future = executor.submit(xr_read_window, f, _w, mask=_m) - # cropped = xr_read_window(f, _w, mask=_m) + for j, ds in enumerate(ds_list): + print(f"{j}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + _t = ds.time.values[0] + for _i, (_m, _w) in enumerate(mask_win_list): + future = executor.submit( + xr_read_window_time, ds, _w, mask=_m, idx=_i, time=_t + ) + # cropped = xr_read_window(ds, _w, mask=_m) # stats.append(cropped.mean()) future_list.append(future) for _f in concurrent.futures.as_completed(future_list): - stats.append(_f.result().mean()) + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) - [f.close() for f in filehandles] - return stats + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() + + [ds.close() for ds in ds_list] + return df_dict diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index b7717d0..5ea89cc 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -148,9 +148,11 @@ def main(): start_time = time.time() print(f"Working on the new way with threading parallel.") fd3t = get_forcing_dict_newway_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=16, ) @@ -159,9 +161,11 @@ def main(): start_time = time.time() print(f"Working on the new way with process parallel.") fd3p = get_forcing_dict_newway_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="process", para_n=16, ) @@ -181,9 +185,11 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with threading parallel.") fd5t = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=16, ) @@ -192,9 +198,11 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with process parallel.") fd5p = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="process", para_n=16, ) From 476ee1f1d3a1cc93fd63f47435e30d3ada3357ee Mon Sep 17 00:00:00 2001 From: James Halgren Date: Thu, 13 Apr 2023 16:15:12 -0500 Subject: [PATCH 13/13] add parallel options to inverted parallel function --- ngen_forcing/process_nwm_forcing_to_ngen.py | 21 +++++++++---------- .../test_process_nwm_forcing_to_ngen.py | 4 +--- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py index ec299b5..300b626 100644 --- a/ngen_forcing/process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -8,12 +8,6 @@ class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin): pass -def junk(): - - flist = gpkg_divides.geometry.to_list() - polys = flist - - def get_forcing_dict_newway( feature_index, feature_list, @@ -21,7 +15,6 @@ def get_forcing_dict_newway( file_list, var_list, ): - reng = "rasterio" _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) @@ -136,7 +129,6 @@ def get_forcing_dict_newway_inverted( file_list, var_list, ): - reng = "rasterio" _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) @@ -193,7 +185,6 @@ def get_forcing_dict_newway_inverted_parallel( para="thread", para_n=2, ): - import concurrent.futures reng = "rasterio" @@ -208,10 +199,15 @@ def get_forcing_dict_newway_inverted_parallel( copy=False, ) + ds_list = [xr.open_dataset("data/" + f) for f in file_list] + + stats = [] + future_list = [] mask_win_list = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): + + for i, feature in enumerate(feature_list): print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + mask, _, window = polymask(_u2d)(feature) mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) mask_win_list.append((mask, winslices)) @@ -232,6 +228,9 @@ def get_forcing_dict_newway_inverted_parallel( pool = concurrent.futures.ThreadPoolExecutor with pool(max_workers=para_n) as executor: + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) for j, ds in enumerate(ds_list): print(f"{j}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index 5ea89cc..ed98587 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -94,7 +94,6 @@ def get_forcing_dict( def main(): - folder_prefix = Path("data") list_of_files = [ f"nwm.t12z.medium_range.forcing.f{_r:03}.conus.nc" for _r in range(1, 241) @@ -146,6 +145,7 @@ def main(): print(time.time() - start_time) start_time = time.time() + print(f"Working on the new way with threading parallel.") fd3t = get_forcing_dict_newway_parallel( gpkg_subset.index, @@ -170,7 +170,6 @@ def main(): para_n=16, ) print(time.time() - start_time) - start_time = time.time() print(f"Working on the new way with loops reversed.") fd4 = get_forcing_dict_newway_inverted( @@ -194,7 +193,6 @@ def main(): para_n=16, ) print(time.time() - start_time) - start_time = time.time() print(f"Working on the new way with loops reversed with process parallel.") fd5p = get_forcing_dict_newway_inverted_parallel(