From 21834c14c247b8ef9307a71e062dc69510bf5082 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 16 Apr 2025 13:53:57 +0200 Subject: [PATCH 1/2] adjusted code to work with new sample product locations --- integration/test_sen1_native.py | 1 + integration/test_sen2_analysis.py | 50 +++++++++++++--------- integration/test_sen2_native.py | 69 +++++++++++++----------------- tests/test_source.py | 46 ++++++++++++++++++++ tests/test_store.py | 46 -------------------- xarray_eopf/backend.py | 15 +------ xarray_eopf/source.py | 60 ++++++++++++++++++++++++++ xarray_eopf/store.py | 70 ------------------------------- 8 files changed, 168 insertions(+), 189 deletions(-) create mode 100644 tests/test_source.py delete mode 100644 tests/test_store.py create mode 100644 xarray_eopf/source.py delete mode 100644 xarray_eopf/store.py diff --git a/integration/test_sen1_native.py b/integration/test_sen1_native.py index 80c7cf3..e1c44f3 100644 --- a/integration/test_sen1_native.py +++ b/integration/test_sen1_native.py @@ -6,6 +6,7 @@ import xarray as xr +# TODO: adjust path to new locations bucket = "e05ab01a9d56408d82ac32d69a5aae2a:sample-data" path_prefix = "tutorial_data/cpm_v253" url_prefix = f"s3://{bucket}/{path_prefix}" diff --git a/integration/test_sen2_analysis.py b/integration/test_sen2_analysis.py index ac4a379..0ba4758 100644 --- a/integration/test_sen2_analysis.py +++ b/integration/test_sen2_analysis.py @@ -11,10 +11,11 @@ from xarray_eopf.spatial import get_spatial_vars from xarray_eopf.utils import timeit -bucket = "e05ab01a9d56408d82ac32d69a5aae2a:sample-data" -path_prefix = "tutorial_data/cpm_v253" -s3_prefix = f"s3://{bucket}/{path_prefix}" -https_prefix = f"https://{DEFAULT_ENDPOINT_URL}/{bucket}/{path_prefix}" +s02msil1c_bucket = "e05ab01a9d56408d82ac32d69a5aae2a:202504-s02msil1c" +s02msil2a_bucket = "e05ab01a9d56408d82ac32d69a5aae2a:202504-s02msil2a" +path_prefix = "15/products/cpm_v256" +l1c_filename = "S2B_MSIL1C_20250415T142749_N0511_R139_T25WEV_20250415T180239.zarr" +l2a_filename = "S2B_MSIL2A_20250415T142749_N0511_R139_T25WEV_20250415T181516.zarr" allowed_open_time = 5 # seconds show_chunking = False @@ -22,16 +23,24 @@ class Sentinel2AnalysisTest(TestCase): def test_open_dataset_sen2_l1c_s3(self): - self._test_open_dataset_sen2_l1c(s3_prefix) + self._test_open_dataset_sen2_l1c(f"s3://{s02msil1c_bucket}/{path_prefix}") + + def test_open_dataset_sen2_l2a_s3(self): + self._test_open_dataset_sen2_l2a(f"s3://{s02msil2a_bucket}/{path_prefix}") def test_open_dataset_sen2_l1c_https(self): - self._test_open_dataset_sen2_l1c(https_prefix) + self._test_open_dataset_sen2_l1c( + f"{DEFAULT_ENDPOINT_URL}/{s02msil1c_bucket}/{path_prefix}" + ) - def _test_open_dataset_sen2_l1c(self, url_prefix): - url = ( - f"{url_prefix}/" - "S2B_MSIL1C_20250113T103309_N0511_R108_T32TLQ_20250113T122458.zarr" + def test_open_dataset_sen2_l2a_https(self): + self._test_open_dataset_sen2_l2a( + f"{DEFAULT_ENDPOINT_URL}/{s02msil2a_bucket}/{path_prefix}" ) + + def _test_open_dataset_sen2_l1c(self, url_prefix): + # See https://stac.browser.user.eopf.eodc.eu/collections/sentinel-2-l1c/items/S2B_MSIL1C_20250415T142749_N0511_R139_T25WEV_20250415T180239 + url = f"{url_prefix}/{l1c_filename}" with timeit("open " + url) as result: # noinspection PyTypeChecker ds = xr.open_dataset( @@ -51,17 +60,8 @@ def _test_open_dataset_sen2_l1c(self, url_prefix): for var_name in spatial_vars.keys(): self.assertEqual((10980, 10980), ds[var_name].shape[-2:], msg=var_name) - def test_open_dataset_sen2_l2a_s3(self): - self._test_open_dataset_sen2_l2a(s3_prefix) - - def test_open_dataset_sen2_l2a_https(self): - self._test_open_dataset_sen2_l2a(https_prefix) - def _test_open_dataset_sen2_l2a(self, url_prefix): - url = ( - f"{url_prefix}/" - "S2A_MSIL2A_20240101T102431_N0510_R065_T32TNT_20240101T144052.zarr" - ) + url = f"{url_prefix}/{l2a_filename}" with timeit("open " + url) as result: # noinspection PyTypeChecker ds = xr.open_dataset( @@ -83,3 +83,13 @@ def _test_open_dataset_sen2_l2a(self, url_prefix): assert_data_arrays_are_chunked(self, spatial_vars, verbose=show_chunking) for var_name in spatial_vars.keys(): self.assertEqual((10980, 10980), ds[var_name].shape[-2:], msg=var_name) + + def test_production(self): + url = ( + "https://objectstore.eodc.eu:2222/" + "e05ab01a9d56408d82ac32d69a5aae2a:202504-s02msil2a/15/products/" + "cpm_v256/" + "S2B_MSIL2A_20250415T142749_N0511_R139_T25WEU_20250415T181516.zarr" + ) + ds = xr.open_dataset(url, engine="eopf-zarr") + self.assertIsInstance(ds, xr.Dataset) diff --git a/integration/test_sen2_native.py b/integration/test_sen2_native.py index 524fd34..a8c5111 100644 --- a/integration/test_sen2_native.py +++ b/integration/test_sen2_native.py @@ -11,27 +11,43 @@ from xarray_eopf.spatial import get_spatial_vars from xarray_eopf.utils import timeit -bucket = "e05ab01a9d56408d82ac32d69a5aae2a:sample-data" -path_prefix = "tutorial_data/cpm_v253" -s3_prefix = f"s3://{bucket}/{path_prefix}" -https_prefix = f"https://{DEFAULT_ENDPOINT_URL}/{bucket}/{path_prefix}" +s02msil1c_bucket = "e05ab01a9d56408d82ac32d69a5aae2a:202504-s02msil1c" +s02msil2a_bucket = "e05ab01a9d56408d82ac32d69a5aae2a:202504-s02msil2a" +path_prefix = "15/products/cpm_v256" +l1c_filename = "S2B_MSIL1C_20250415T142749_N0511_R139_T25WEV_20250415T180239.zarr" +l2a_filename = "S2B_MSIL2A_20250415T142749_N0511_R139_T25WEV_20250415T181516.zarr" allowed_open_time = 5 # seconds class Sentinel2NativeTest(TestCase): def test_open_datatree_sen2_l1c_s3(self): - self._test_open_datatree_sen2_l1c(s3_prefix) + self._test_open_datatree_sen2_l1c(f"s3://{s02msil1c_bucket}/{path_prefix}") + + def test_open_dataset_sen2_l1c_s3(self): + self._test_open_dataset_sen2_l1c(f"s3://{s02msil1c_bucket}/{path_prefix}") + + def test_open_dataset_sen2_l2a_s3(self): + self._test_open_dataset_sen2_l2a(f"s3://{s02msil2a_bucket}/{path_prefix}") def test_open_datatree_sen2_l1c_https(self): - self._test_open_datatree_sen2_l1c(https_prefix) + self._test_open_datatree_sen2_l1c( + f"{DEFAULT_ENDPOINT_URL}/{s02msil1c_bucket}/{path_prefix}" + ) + + def test_open_dataset_sen2_l1c_https(self): + self._test_open_dataset_sen2_l1c( + f"{DEFAULT_ENDPOINT_URL}/{s02msil1c_bucket}/{path_prefix}" + ) + + def test_open_dataset_sen2_l2a_https(self): + self._test_open_dataset_sen2_l2a( + f"{DEFAULT_ENDPOINT_URL}/{s02msil2a_bucket}/{path_prefix}" + ) def _test_open_datatree_sen2_l1c(self, url_prefix: str): # noinspection PyTypeChecker - url = ( - f"{url_prefix}/" - f"S2B_MSIL1C_20250113T103309_N0511_R108_T32TLQ_20250113T122458.zarr" - ) + url = f"{url_prefix}/{l1c_filename}" with timeit("open " + url) as result: # noinspection PyTypeChecker dt = xr.open_datatree(url, engine="eopf-zarr", op_mode="native", chunks={}) @@ -49,17 +65,8 @@ def _test_open_datatree_sen2_l1c(self, url_prefix: str): ) assert_data_arrays_are_chunked(self, spatial_vars, verbose=True) - def test_open_datatree_sen2_l2a_s3(self): - self._test_open_datatree_sen2_l2a(s3_prefix) - - def test_open_datatree_sen2_l2a_https(self): - self._test_open_datatree_sen2_l2a(https_prefix) - def _test_open_datatree_sen2_l2a(self, url_prefix: str): - url = ( - f"{url_prefix}/" - "S2A_MSIL2A_20240101T102431_N0510_R065_T32TNT_20240101T144052.zarr" - ) + url = f"{url_prefix}/{l2a_filename}.zarr" with timeit("open " + url) as result: # noinspection PyTypeChecker dt = xr.open_datatree(url, engine="eopf-zarr", op_mode="native", chunks={}) @@ -77,17 +84,8 @@ def _test_open_datatree_sen2_l2a(self, url_prefix: str): ) assert_data_arrays_are_chunked(self, spatial_vars, verbose=True) - def test_open_dataset_sen2_l1c_s3(self): - self._test_open_dataset_sen2_l1c(s3_prefix) - - def test_open_dataset_sen2_l1c_https(self): - self._test_open_dataset_sen2_l1c(https_prefix) - def _test_open_dataset_sen2_l1c(self, url_prefix: str): - url = ( - f"{url_prefix}/" - "S2B_MSIL1C_20250113T103309_N0511_R108_T32TLQ_20250113T122458.zarr" - ) + url = f"{url_prefix}/{l1c_filename}" with timeit(url) as result: # noinspection PyTypeChecker ds = xr.open_dataset(url, engine="eopf-zarr", op_mode="native", chunks={}) @@ -105,17 +103,8 @@ def _test_open_dataset_sen2_l1c(self, url_prefix: str): self.assertEqual(43, len(spatial_vars)) assert_data_arrays_are_chunked(self, spatial_vars, verbose=True) - def test_open_dataset_sen2_l2a_s3(self): - self._test_open_dataset_sen2_l2a(s3_prefix) - - def test_open_dataset_sen2_l2a_https(self): - self._test_open_dataset_sen2_l2a(https_prefix) - def _test_open_dataset_sen2_l2a(self, url_prefix: str): - url = ( - f"{url_prefix}/" - "S2A_MSIL2A_20240101T102431_N0510_R065_T32TNT_20240101T144052.zarr" - ) + url = f"{url_prefix}/{l2a_filename}" with timeit(url) as result: # noinspection PyTypeChecker ds = xr.open_dataset(url, engine="eopf-zarr", op_mode="native", chunks={}) diff --git a/tests/test_source.py b/tests/test_source.py new file mode 100644 index 0000000..d16df9d --- /dev/null +++ b/tests/test_source.py @@ -0,0 +1,46 @@ +# Copyright (c) 2025 by EOPF Sample Service team and contributors +# Permissions are hereby granted under the terms of the Apache 2.0 License: +# https://opensource.org/license/apache-2-0. +from pathlib import Path +from unittest import TestCase + +import fsspec +import pytest +import s3fs + +from xarray_eopf.source import normalize_source + + +class NormalizeSourceTest(TestCase): + def test_s3_url(self): + url = "s3://no-bucket/test.zarr" + store = normalize_source(url, None) + self.assertIsInstance(store, fsspec.FSMap) + self.assertIsInstance(store.fs, s3fs.S3FileSystem) + self.assertEqual("no-bucket/test.zarr", store.root) + + def test_ceph_s3_url(self): + ceph_url = "s3://no-bucket:e6f4/test.zarr" + store = normalize_source(ceph_url, None) + self.assertIsInstance(store, fsspec.FSMap) + self.assertIsInstance(store.fs, s3fs.S3FileSystem) + self.assertEqual("no-bucket:e6f4/test.zarr", store.root) + + def test_https_url(self): + path = "https://unknown.object.storage.com/no-bucket/test.zarr" + source = normalize_source(path, None) + self.assertEqual(path, source) + + def test_other(self): + mapping = {} + self.assertIs(mapping, normalize_source(mapping, None)) + + path = Path("data/test.zarr") + self.assertIs(path, normalize_source(path, None)) + + # noinspection PyMethodMayBeStatic + def test_fail(self): + with pytest.raises( + ValueError, match="storage_options argument applies only to paths or URLs" + ): + normalize_source({}, {}) diff --git a/tests/test_store.py b/tests/test_store.py deleted file mode 100644 index 4e7d724..0000000 --- a/tests/test_store.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (c) 2025 by EOPF Sample Service team and contributors -# Permissions are hereby granted under the terms of the Apache 2.0 License: -# https://opensource.org/license/apache-2-0. - -from unittest import TestCase - -import fsspec -import pytest -import s3fs - -from xarray_eopf.store import open_store - - -class OpenStoreTest(TestCase): - def test_s3_url(self): - store = open_store("s3://no-bucket/test.zarr", None, None) - self.assertIsInstance(store, fsspec.FSMap) - self.assertIsInstance(store.fs, s3fs.S3FileSystem) - - def test_ceph_s3_url(self): - store = open_store("s3://no-bucket:e6f4/test.zarr", None, None) - self.assertIsInstance(store, fsspec.FSMap) - self.assertIsInstance(store.fs, s3fs.S3FileSystem) - - def test_https_url(self): - store = open_store( - "https://unknown.object.storage.com/no-bucket/test.zarr", None, None - ) - self.assertIsInstance(store, fsspec.FSMap) - self.assertIsInstance(store.fs, fsspec.get_filesystem_class("http")) - - def test_other(self): - filename_or_obj = {} - store = open_store(filename_or_obj, None, None) - self.assertIs(store, filename_or_obj) - - # noinspection PyMethodMayBeStatic - def test_fail(self): - with pytest.raises( - ValueError, match="protocol argument applies only to paths or URLs" - ): - _store = open_store({}, "s3", None) - with pytest.raises( - ValueError, match="storage_options argument applies only to paths or URLs" - ): - _store = open_store({}, None, {}) diff --git a/xarray_eopf/backend.py b/xarray_eopf/backend.py index c5672ba..c7ad71b 100644 --- a/xarray_eopf/backend.py +++ b/xarray_eopf/backend.py @@ -21,7 +21,7 @@ ) from .filter import filter_dataset from .flatten import flatten_datatree, flatten_datatree_as_dict -from .store import open_store +from .source import normalize_source from .utils import assert_arg_is_one_of @@ -40,7 +40,6 @@ def open_datatree( *, op_mode: OpMode = OP_MODE_ANALYSIS, product_type: str | None = None, - protocol: str | None = None, storage_options: Mapping[str, Any] | None = None, drop_variables: str | Iterable[str] | None = None, decode_timedelta: ( @@ -59,10 +58,6 @@ def open_datatree( Only used if `op_mode="analysis"`; typically not required if the filename inherent to `filename_or_obj` adheres to EOPF naming conventions. - protocol: If `filename_or_obj` is a file path or URL, - it forces using the specified filesystem protocol. - Otherwise, the protocol will be derived from the file path or URL. - Will be passed to [`fsspec.filesystem()`](https://filesystem-spec.readthedocs.io/en/latest/usage.html). storage_options: If `filename_or_obj` is a file path or URL, these options specify the source filesystem. Will be passed to [`fsspec.filesystem()`](https://filesystem-spec.readthedocs.io/en/latest/usage.html). @@ -78,7 +73,7 @@ def open_datatree( assert_arg_is_one_of(op_mode, "op_mode", OP_MODES) - fs_store = open_store(filename_or_obj, protocol, storage_options) + fs_store = normalize_source(filename_or_obj, storage_options) datatree = xr.open_datatree( fs_store, @@ -107,7 +102,6 @@ def open_dataset( *, op_mode: OpMode = OP_MODE_ANALYSIS, # params for op_mode=native/analysis - protocol: str | None = None, storage_options: Mapping[str, Any] | None = None, group_sep: str = "_", variables: str | Iterable[str] | None = None, @@ -133,10 +127,6 @@ def open_dataset( Only used if `op_mode="analysis"`; typically not required if the filename inherent to `filename_or_obj` adheres to EOPF naming conventions. - protocol: If `filename_or_obj` is a file path or URL, - it forces using the specified filesystem protocol. - Otherwise, the protocol will be derived from the file path or URL. - Will be passed to [`fsspec.filesystem()`](https://filesystem-spec.readthedocs.io/en/latest/usage.html). storage_options: If `filename_or_obj` is a file path or URL, these options specify the source filesystem. Will be passed to [`fsspec.filesystem()`](https://filesystem-spec.readthedocs.io/en/latest/usage.html). @@ -167,7 +157,6 @@ def open_dataset( datatree = self.open_datatree( filename_or_obj, op_mode="native", - protocol=protocol, storage_options=storage_options, # here as it is required for all backends drop_variables=drop_variables, diff --git a/xarray_eopf/source.py b/xarray_eopf/source.py new file mode 100644 index 0000000..544c93f --- /dev/null +++ b/xarray_eopf/source.py @@ -0,0 +1,60 @@ +# Copyright (c) 2025 by EOPF Sample Service team and contributors +# Permissions are hereby granted under the terms of the Apache 2.0 License: +# https://opensource.org/license/apache-2-0. +from pathlib import Path + +from collections.abc import Mapping +from typing import Any + +import fsspec +import s3fs + +from xarray_eopf.constants import DEFAULT_ENDPOINT_URL + + +def normalize_source(source: Any, storage_options: Mapping[str, Any] | None) -> Any: + if isinstance(source, (str, Path)): + protocol, root = fsspec.core.split_protocol(source) + if protocol == "s3": + return _get_s3_store(root, storage_options) + else: + if storage_options is not None: + raise ValueError("storage_options argument applies only to paths or URLs") + return source + + +def _get_s3_store(root: str, storage_options: Mapping[str, Any] | None) -> fsspec.FSMap: + # CEPH uses a non-standard colon to separate tenant name from + # the bucket name. We need to convince boto3 to work with that. + storage_options = storage_options or {} + is_ceph_fs = ":" in root + if ( + "anon" not in storage_options + and "client" not in storage_options + and "secret" not in storage_options + ): + storage_options["anon"] = True + if ( + is_ceph_fs + and "endpoint_url" not in storage_options + and "endpoint_url" not in storage_options.get("client_kwargs", {}) + ): + storage_options["endpoint_url"] = DEFAULT_ENDPOINT_URL + + s3_fs = s3fs.S3FileSystem(**storage_options) + if is_ceph_fs: + # The following is a hack to force boto3 to deal with colons + # in bucket names. + # First unregister handler to make boto3 work with CEPH + # noinspection PyProtectedMember + handlers = s3_fs.s3.meta.events._emitter._handlers + handlers_to_unregister = handlers.prefix_search("before-parameter-build.s3") + if len(handlers_to_unregister): + # The first handler should be the function 'validate_bucket_name()' + handler_to_unregister = handlers_to_unregister[0] + # noinspection PyProtectedMember + s3_fs.s3.meta.events._emitter.unregister( + "before-parameter-build.s3", handler_to_unregister + ) + + return s3_fs.get_mapper(root=root, create=False, check=False) diff --git a/xarray_eopf/store.py b/xarray_eopf/store.py deleted file mode 100644 index 8467b0a..0000000 --- a/xarray_eopf/store.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright (c) 2025 by EOPF Sample Service team and contributors -# Permissions are hereby granted under the terms of the Apache 2.0 License: -# https://opensource.org/license/apache-2-0. - -from collections.abc import Mapping -from typing import Any - -import fsspec -import s3fs - -from xarray_eopf.constants import DEFAULT_ENDPOINT_URL - - -def open_store( - filename_or_obj: Any, - protocol: str | None, - storage_options: Mapping[str, Any] | None, -) -> Any: - if isinstance(filename_or_obj, str): - return _open_fs_store(filename_or_obj, protocol, storage_options) - else: - if protocol is not None: - raise ValueError("protocol argument applies only to paths or URLs") - if storage_options is not None: - raise ValueError("storage_options argument applies only to paths or URLs") - return filename_or_obj - - -def _open_fs_store( - path_or_url: str, protocol: str | None, storage_options: Mapping[str, Any] | None -) -> fsspec.FSMap: - _protocol, root = fsspec.core.split_protocol(path_or_url) - protocol = protocol or _protocol or "file" - # CEPH uses a non-standard colon to separate tenant name from - # the bucket name. We need to convince boto3 to work with that. - storage_options = storage_options or {} - is_ceph_fs = False - if protocol == "s3": - is_ceph_fs = ":" in root - if ( - "anon" not in storage_options - and "client" not in storage_options - and "secret" not in storage_options - ): - storage_options["anon"] = True - if ( - is_ceph_fs - and "endpoint_url" not in storage_options - and "endpoint_url" not in storage_options.get("client_kwargs", {}) - ): - storage_options["endpoint_url"] = DEFAULT_ENDPOINT_URL - - fs = fsspec.filesystem(protocol, **storage_options) - if is_ceph_fs and isinstance(fs, s3fs.S3FileSystem): - # The following is a hack to force - # boto3 to deal with colons in bucket names - s3_fs: s3fs.S3FileSystem = fs - # unregister handler to make boto3 work with CEPH - # noinspection PyProtectedMember - handlers = s3_fs.s3.meta.events._emitter._handlers - handlers_to_unregister = handlers.prefix_search("before-parameter-build.s3") - if len(handlers_to_unregister): - # The first handler should be the function 'validate_bucket_name()' - handler_to_unregister = handlers_to_unregister[0] - # noinspection PyProtectedMember - s3_fs.s3.meta.events._emitter.unregister( - "before-parameter-build.s3", handler_to_unregister - ) - - return fs.get_mapper(root=root, create=False, check=False) From fc38a8e67660b8671e6a91f59a9cc95225edb806 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 16 Apr 2025 14:11:59 +0200 Subject: [PATCH 2/2] adjusted example notebooks --- examples/open-sen2-analysis.ipynb | 426 +-- examples/open-sen2-native.ipynb | 5578 +++++++++++------------------ 2 files changed, 2276 insertions(+), 3728 deletions(-) diff --git a/examples/open-sen2-analysis.ipynb b/examples/open-sen2-analysis.ipynb index 47a80bb..b6b9ecd 100644 --- a/examples/open-sen2-analysis.ipynb +++ b/examples/open-sen2-analysis.ipynb @@ -26,9 +26,9 @@ "metadata": {}, "outputs": [], "source": [ - "path = (\n", - " \"s3://e05ab01a9d56408d82ac32d69a5aae2a:sample-data/tutorial_data/\"\n", - " \"cpm_v253/S2A_MSIL2A_20240101T102431_N0510_R065_T32TNT_20240101T144052.zarr\"\n", + "url = (\n", + " \"https://objectstore.eodc.eu:2222/e05ab01a9d56408d82ac32d69a5aae2a:202504-s02msil2a/15/products/cpm_v256/\"\n", + " \"S2B_MSIL2A_20250415T142749_N0511_R139_T25WEV_20250415T181516.zarr\"\n", ")" ] }, @@ -415,7 +415,7 @@ "Dimensions: (x: 1830, y: 1830)\n", "Coordinates:\n", " * x (x) int64 15kB 500010 500070 500130 ... 609630 609690 609750\n", - " * y (y) int64 15kB 5300010 5299950 5299890 ... 5190330 5190270\n", + " * y (y) int64 15kB 8000010 7999950 7999890 ... 7890330 7890270\n", " band int64 8B ...\n", " spatial_ref int64 8B ...\n", "Data variables: (12/15)\n", @@ -445,7 +445,8 @@ " planimetric_stability_assessment_from_AOCS: ...\n", " product_quality_status: ...\n", " reflectance_correction_factor_from_the_Sun-Earth_distance_variation_compu...\n", - " spectral_band_of_reference: ...