diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java index d24f7ac9d7cb2..7aba771a95a1c 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatPlugin.java @@ -24,7 +24,6 @@ import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.Plugin; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -127,39 +126,25 @@ public Map> getFormatDescriptors( /** * Returns the store strategies from every participating sub-format plugin - * (primary + secondary). Sub-plugins that do not implement - * {@link DataFormatPlugin#getStoreStrategy()} contribute nothing. + * (primary + secondary), keyed by format name. Mirrors {@link #getFormatDescriptors}: + * each participating format is resolved through the registry, which delegates + * to the sub-plugin without re-entering this composite. */ @Override - public List getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) { + public Map getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) { Settings settings = indexSettings.getSettings(); String primaryFormatName = PRIMARY_DATA_FORMAT.get(settings); List secondaryFormatNames = SECONDARY_DATA_FORMATS.get(settings); - List out = new ArrayList<>(); - collectStrategy(dataFormatRegistry, primaryFormatName, out, indexSettings); - for (String secondary : secondaryFormatNames) { - collectStrategy(dataFormatRegistry, secondary, out, indexSettings); + Map strategies = new HashMap<>(); + if (primaryFormatName != null && primaryFormatName.isEmpty() == false) { + strategies.putAll(dataFormatRegistry.getStoreStrategies(indexSettings, dataFormatRegistry.format(primaryFormatName))); } - return List.copyOf(out); - } - - private static void collectStrategy( - DataFormatRegistry registry, - String formatName, - List out, - IndexSettings indexSettings - ) { - if (formatName == null || formatName.isEmpty()) { - return; - } - DataFormatPlugin plugin = registry.getPlugin(formatName); - if (plugin == null) { - return; - } - List contributed = plugin.getStoreStrategies(indexSettings, registry); - if (contributed != null) { - out.addAll(contributed); + for (String secondaryName : secondaryFormatNames) { + if (secondaryName != null && secondaryName.isEmpty() == false) { + strategies.putAll(dataFormatRegistry.getStoreStrategies(indexSettings, dataFormatRegistry.format(secondaryName))); + } } + return Map.copyOf(strategies); } } diff --git a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatPluginTests.java b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatPluginTests.java index d26474d2cd48d..201c023d15e14 100644 --- a/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatPluginTests.java +++ b/sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeDataFormatPluginTests.java @@ -15,7 +15,6 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.dataformat.DataFormat; import org.opensearch.index.engine.dataformat.DataFormatDescriptor; -import org.opensearch.index.engine.dataformat.DataFormatPlugin; import org.opensearch.index.engine.dataformat.DataFormatRegistry; import org.opensearch.index.engine.dataformat.StoreStrategy; import org.opensearch.test.OpenSearchTestCase; @@ -95,9 +94,12 @@ public void testGetStoreStrategiesEmptyWhenNoSubPlugins() { DataFormatRegistry registry = mock(DataFormatRegistry.class); IndexSettings indexSettings = buildIndexSettings(Settings.builder().put("index.composite.primary_data_format", "parquet").build()); - when(registry.getPlugin("parquet")).thenReturn(null); + // Registry resolves the format but the composite pipeline finds no plugin → empty map. + DataFormat parquetFormat = CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of()); + when(registry.format("parquet")).thenReturn(parquetFormat); + when(registry.getStoreStrategies(indexSettings, parquetFormat)).thenReturn(Map.of()); - List result = plugin.getStoreStrategies(indexSettings, registry); + Map result = plugin.getStoreStrategies(indexSettings, registry); assertTrue("Should return empty when no sub-plugin found", result.isEmpty()); } @@ -107,15 +109,14 @@ public void testGetStoreStrategiesCollectsFromPrimaryPlugin() { IndexSettings indexSettings = buildIndexSettings(Settings.builder().put("index.composite.primary_data_format", "parquet").build()); - DataFormatPlugin subPlugin = mock(DataFormatPlugin.class); + DataFormat parquetFormat = CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of()); StoreStrategy parquetStrategy = mock(StoreStrategy.class); - when(parquetStrategy.name()).thenReturn("parquet"); - when(subPlugin.getStoreStrategies(indexSettings, registry)).thenReturn(List.of(parquetStrategy)); - when(registry.getPlugin("parquet")).thenReturn(subPlugin); + when(registry.format("parquet")).thenReturn(parquetFormat); + when(registry.getStoreStrategies(indexSettings, parquetFormat)).thenReturn(Map.of("parquet", parquetStrategy)); - List result = plugin.getStoreStrategies(indexSettings, registry); + Map result = plugin.getStoreStrategies(indexSettings, registry); assertEquals(1, result.size()); - assertSame(parquetStrategy, result.get(0)); + assertSame(parquetStrategy, result.get("parquet")); } public void testGetStoreStrategiesCollectsPrimaryAndSecondary() { @@ -129,20 +130,18 @@ public void testGetStoreStrategiesCollectsPrimaryAndSecondary() { .build() ); - DataFormatPlugin luceneSubPlugin = mock(DataFormatPlugin.class); - when(luceneSubPlugin.getStoreStrategies(indexSettings, registry)).thenReturn(List.of()); - - DataFormatPlugin parquetSubPlugin = mock(DataFormatPlugin.class); + DataFormat luceneFormat = CompositeTestHelper.stubFormat("lucene", 1, java.util.Set.of()); + DataFormat parquetFormat = CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of()); StoreStrategy parquetStrategy = mock(StoreStrategy.class); - when(parquetStrategy.name()).thenReturn("parquet"); - when(parquetSubPlugin.getStoreStrategies(indexSettings, registry)).thenReturn(List.of(parquetStrategy)); - when(registry.getPlugin("lucene")).thenReturn(luceneSubPlugin); - when(registry.getPlugin("parquet")).thenReturn(parquetSubPlugin); + when(registry.format("lucene")).thenReturn(luceneFormat); + when(registry.format("parquet")).thenReturn(parquetFormat); + when(registry.getStoreStrategies(indexSettings, luceneFormat)).thenReturn(Map.of()); + when(registry.getStoreStrategies(indexSettings, parquetFormat)).thenReturn(Map.of("parquet", parquetStrategy)); - List result = plugin.getStoreStrategies(indexSettings, registry); + Map result = plugin.getStoreStrategies(indexSettings, registry); assertEquals(1, result.size()); - assertSame(parquetStrategy, result.get(0)); + assertSame(parquetStrategy, result.get("parquet")); } public void testGetStoreStrategiesEmptyForDefaultPrimaryWithoutPlugin() { @@ -150,9 +149,11 @@ public void testGetStoreStrategiesEmptyForDefaultPrimaryWithoutPlugin() { DataFormatRegistry registry = mock(DataFormatRegistry.class); IndexSettings indexSettings = buildIndexSettings(Settings.EMPTY); // defaults: primary=lucene, secondary=[] - when(registry.getPlugin("lucene")).thenReturn(null); + DataFormat luceneFormat = CompositeTestHelper.stubFormat("lucene", 1, java.util.Set.of()); + when(registry.format("lucene")).thenReturn(luceneFormat); + when(registry.getStoreStrategies(indexSettings, luceneFormat)).thenReturn(Map.of()); - List result = plugin.getStoreStrategies(indexSettings, registry); + Map result = plugin.getStoreStrategies(indexSettings, registry); assertTrue("Should return empty when lucene sub-plugin not found", result.isEmpty()); } diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java index a4d8d314d234f..34f90a918fd64 100644 --- a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java @@ -54,7 +54,7 @@ * {@link ParquetIndexingEngine} instances created in {@link #indexingEngine}. * *

For tiered storage, returns a {@link ParquetStoreStrategy} from - * {@link #getStoreStrategy()}. The composite store layer takes it from there — + * {@link #getStoreStrategies}. The composite store layer takes it from there — * construction of per-shard native registries, seeding from remote metadata, * routing directory events, and closing native resources are all handled * there. The plugin stays purely declarative. @@ -120,8 +120,8 @@ public Map> getFormatDescriptors(IndexSet } @Override - public StoreStrategy getStoreStrategy() { - return storeStrategy; + public Map getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry registry) { + return Map.of(ParquetDataFormat.PARQUET_DATA_FORMAT_NAME, storeStrategy); } @Override diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/store/ParquetNativeFileRegistry.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/store/ParquetNativeFileRegistry.java new file mode 100644 index 0000000000000..59a0f7fe8a479 --- /dev/null +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/store/ParquetNativeFileRegistry.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.parquet.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.engine.dataformat.NativeFileRegistry; +import org.opensearch.plugins.NativeStoreHandle; +import org.opensearch.repositories.NativeStoreRepository; + +import java.io.IOException; +import java.util.Map; + +/** + * Per-shard native file registry for the parquet format. + * + *

Wraps the Rust {@code TieredObjectStore} pointer that the parquet native + * reader consults to resolve file identifiers to blob paths. The store layer + * owns the lifecycle — this class is just the FFM boundary. + * + *

Read-only warm (current): every parquet file is REMOTE; the Java directory + * always reads from the remote store. The registry still receives + * seed/upload/remove events so the Rust {@code FileRegistry} stays consistent + * with the Java view. + */ +public final class ParquetNativeFileRegistry implements NativeFileRegistry { + + private static final Logger logger = LogManager.getLogger(ParquetNativeFileRegistry.class); + + private final ShardId shardId; + private NativeStoreHandle storeHandle = NativeStoreHandle.EMPTY; + + public ParquetNativeFileRegistry(ShardId shardId, boolean isWarm, NativeStoreRepository repoStore) { + this.shardId = shardId; + if (isWarm && repoStore != null && repoStore.isLive()) { + // TODO: FFM — storePtr = ts_create_tiered_object_store(localPath) + // TODO: FFM — ts_set_remote_store(storePtr, repoStore.getPointer()) + // this.storeHandle = new NativeStoreHandle(storePtr, TieredStorageNative::destroy); + logger.debug("Created ParquetNativeFileRegistry for shard [{}]", shardId); + } + } + + @Override + public void seed(Map fileToRemotePath) { + if (storeHandle.isLive() == false) { + return; + } + // TODO: FFM — ts_seed_file_locations(storeHandle.getPointer(), fileToRemotePath) + logger.trace("seed: shard=[{}], count={}", shardId, fileToRemotePath.size()); + } + + @Override + public void onUploaded(String file, String remotePath) { + if (storeHandle.isLive() == false) { + return; + } + // TODO: FFM — ts_after_sync_to_remote(storeHandle.getPointer(), file, remotePath) + logger.trace("onUploaded: shard=[{}], file=[{}], remotePath=[{}]", shardId, file, remotePath); + } + + @Override + public void onRemoved(String file) { + if (storeHandle.isLive() == false) { + return; + } + // TODO: FFM — ts_remove_file(storeHandle.getPointer(), file) + logger.trace("onRemoved: shard=[{}], file=[{}]", shardId, file); + } + + @Override + public void close() throws IOException { + storeHandle.close(); + } +} diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/store/ParquetStoreStrategy.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/store/ParquetStoreStrategy.java index 80a0cda55beb5..fca93b3ec57ad 100644 --- a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/store/ParquetStoreStrategy.java +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/store/ParquetStoreStrategy.java @@ -10,30 +10,25 @@ import org.opensearch.index.engine.dataformat.NativeFileRegistryFactory; import org.opensearch.index.engine.dataformat.StoreStrategy; -import org.opensearch.parquet.engine.ParquetDataFormat; import java.util.Optional; /** * Store strategy for the parquet data format. * - *

Declares the parquet file-naming convention (files live under the - * {@code "parquet/"} subdirectory) and provides a factory for the per-shard - * native file registry that tracks parquet files for the Rust reader. + *

Uses the default {@code owns} / {@code remotePath} behaviour inherited + * from {@link StoreStrategy} (files live under {@code "parquet/"} prefix, blobs + * are laid out at {@code basePath + "parquet/" + blobKey}). The store layer + * supplies the format name when it invokes those methods, so the strategy + * itself does not carry the name. * - *

All other behaviour (per-shard construction, remote-metadata seeding, - * directory routing, close ordering) is handled by the store layer. This - * class is a pure declaration. + *

Provides a factory for the per-shard native file registry that tracks + * parquet files for the Rust reader. */ public final class ParquetStoreStrategy implements StoreStrategy { private static final NativeFileRegistryFactory FACTORY = ParquetNativeFileRegistry::new; - @Override - public String name() { - return ParquetDataFormat.PARQUET_DATA_FORMAT_NAME; - } - @Override public Optional nativeFileRegistry() { return Optional.of(FACTORY); diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index a5061f61a818e..a9e6dbba165cf 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -788,7 +788,7 @@ protected void closeInternal() { && this.dataFormatAwareStoreDirectoryFactory != null) { // Warm + format-aware: resolve per-shard store strategies and native store, // then let the factory build the StoreStrategyRegistry and directory stack. - List storeStrategies = dataFormatRegistry.getStoreStrategies( + Map storeStrategies = dataFormatRegistry.getStoreStrategies( this.indexSettings ); NativeStoreRepository nativeStore = resolveNativeStore(repositoriesService); diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java index a156ebff732cd..ecb6f3a621e04 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatPlugin.java @@ -11,7 +11,6 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IndexSettings; -import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -25,7 +24,7 @@ *

  • {@link DataFormatDescriptor} via {@link #getFormatDescriptors} — * describes the format (name, checksum strategy, static * capabilities). Per-index value data.
  • - *
  • {@link StoreStrategy} via {@link #getStoreStrategy} — + *
  • {@link StoreStrategy} via {@link #getStoreStrategies} — * behavior for how the format participates in the tiered store * (file ownership, remote layout, optional native registry).
  • * @@ -65,35 +64,22 @@ default Map> getFormatDescriptors( } /** - * Behavior describing how this format participates in the tiered store. + * Returns the strategies describing how this format participates in the tiered store, + * keyed by the format name the strategy applies to. * - *

    Plugins that need tiered-store support return a non-null strategy; - * all other plumbing (per-shard lifecycle, seeding, routing, close) is - * handled by the store layer. + *

    Most plugins contribute a single entry (their own format). Composite plugins, + * which expose multiple formats per index, return one entry per participating format. + * A plugin that does not participate in the tiered store returns an empty map (default). * - * @return a strategy for this format, or {@code null} if the format does - * not participate in the tiered store - */ - default StoreStrategy getStoreStrategy() { - return null; - } - - /** - * Returns all strategies that apply for this plugin on the given index. - * - *

    The default returns a singleton list containing - * {@link #getStoreStrategy()} when it is non-null, otherwise an empty - * list. Composite plugins (which expose multiple formats per index) - * override this to contribute the strategies for every participating - * format. + *

    All cross-cutting work (per-shard lifecycle, seeding, routing, close) is handled + * by the store layer. Plugins only declare strategies here. * * @param indexSettings the index settings - * @param dataFormatRegistry the registry, used by composite plugins to - * resolve sub-format plugins - * @return the strategies that apply; never {@code null} + * @param dataFormatRegistry the registry, used by composite plugins to resolve + * sub-format plugins + * @return the strategies that apply, keyed by format name; never {@code null} */ - default List getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) { - StoreStrategy s = getStoreStrategy(); - return s == null ? List.of() : List.of(s); + default Map getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) { + return Map.of(); } } diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java index b50f97b6b316d..f1743d60f591c 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/DataFormatRegistry.java @@ -153,30 +153,49 @@ public Set getRegisteredFormats() { /** * Returns all {@link StoreStrategy} instances that apply to the active - * data format of the given index. + * data format of the given index, keyed by the format name the strategy + * applies to. * *

    Called once per shard at open time. The store layer uses the returned * strategies to construct per-shard native file registries, seed them from * remote metadata, and route directory events. * * @param indexSettings the index settings for this shard - * @return the list of applicable strategies, or an empty list when no + * @return the map of applicable strategies, or an empty map when no * pluggable data format is configured or the configured format * does not participate in the tiered store */ - public List getStoreStrategies(IndexSettings indexSettings) { + public Map getStoreStrategies(IndexSettings indexSettings) { String dataformatName = indexSettings.pluggableDataFormat(); if (dataformatName != null && dataformatName.isEmpty() == false) { DataFormat format = dataFormats.get(dataformatName); if (format != null) { DataFormatPlugin plugin = dataFormatPluginRegistry.get(format); if (plugin != null) { - List strategies = plugin.getStoreStrategies(indexSettings, this); - return strategies == null ? List.of() : List.copyOf(strategies); + Map strategies = plugin.getStoreStrategies(indexSettings, this); + return strategies == null ? Map.of() : Map.copyOf(strategies); } } } - return List.of(); + return Map.of(); + } + + /** + * Returns store strategies for a specific data format, bypassing the + * {@code pluggable_dataformat} index setting lookup. Used by composite + * plugins to resolve child strategies without recursion. + * + * @param indexSettings the index settings + * @param dataFormat the specific data format to get strategies for + * @return map of format name to strategy, or empty map if the format is not registered + */ + public Map getStoreStrategies(IndexSettings indexSettings, DataFormat dataFormat) { + DataFormatPlugin plugin = dataFormatPluginRegistry.get(dataFormat); + if (plugin == null) { + return Map.of(); + } + Map strategies = plugin.getStoreStrategies(indexSettings, this); + return strategies == null ? Map.of() : strategies; } /** diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/StoreStrategy.java b/server/src/main/java/org/opensearch/index/engine/dataformat/StoreStrategy.java index 770e5eae5faf5..7051e6c0c29a8 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/StoreStrategy.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/StoreStrategy.java @@ -15,34 +15,27 @@ /** * Strategy describing how a data format participates in the tiered store. * - *

    Data format plugins return an implementation from - * {@link DataFormatPlugin#getStoreStrategy()} to tell the store layer three - * things: + *

    Returned by {@link DataFormatPlugin#getStoreStrategies} keyed by the + * format name. The strategy itself is stateless regarding its name — the map + * key supplies the identity — and the store layer passes the name into + * {@link #owns} and {@link #remotePath} whenever behaviour depends on it. + * + *

    A strategy contributes three pieces of behaviour: *

      - *
    • the format name (used to correlate with {@link DataFormat#name()})
    • - *
    • which files in the directory belong to this format ({@link #owns})
    • - *
    • the layout the format uses on the remote store ({@link #remotePath})
    • - *
    • optionally, a per-shard {@link NativeFileRegistryFactory} for formats - * with a native reader
    • + *
    • {@link #owns} — which files in the directory belong to this format
    • + *
    • {@link #remotePath} — how the format lays out blobs on the remote store
    • + *
    • optionally, {@link #nativeFileRegistry()} for formats with a native reader
    • *
    * *

    All cross-cutting work (per-shard lifecycle, seeding from remote metadata, * directory routing, close ordering, sync notifications) is handled by the - * store layer, not by the plugin. A plugin supplies behavior; the layer - * supplies state. + * store layer, not by the plugin. * * @opensearch.experimental */ @ExperimentalApi public interface StoreStrategy { - /** - * The data format name, e.g. {@code "parquet"}. Must match the name used by - * {@link DataFormat#name()} so the store layer can correlate strategies - * with formats resolved elsewhere. - */ - String name(); - /** * Returns true if the given file identifier belongs to this format. * @@ -50,34 +43,38 @@ public interface StoreStrategy { * whose prefix is the format name (e.g. {@code "parquet/seg_0.parquet"}). * Implementations may override to use a different layout. * + * @param name the format name the store layer associated with this + * strategy (the key it was registered under) * @param file file identifier as produced by the directory layer */ - default boolean owns(String file) { + default boolean owns(String name, String file) { if (file == null) { return false; } - return file.startsWith(name() + "/"); + return file.startsWith(name + "/"); } /** * Returns the fully-qualified remote blob path for a file owned by this format. * *

    The default convention places blobs at - * {@code basePath + name() + "/" + blobKey}. Implementations may override + * {@code basePath + name + "/" + blobKey}. Implementations may override * when the format uses a different layout on the remote store. * + * @param name the format name the store layer associated with this + * strategy (the key it was registered under) * @param basePath the repository base path (may be empty) * @param file the file identifier (unused by the default layout) * @param blobKey the uploaded blob key returned by * {@link org.opensearch.index.store.RemoteSegmentStoreDirectory.UploadedSegmentMetadata#getUploadedFilename()} * @return the remote blob path */ - default String remotePath(String basePath, String file, String blobKey) { + default String remotePath(String name, String basePath, String file, String blobKey) { StringBuilder sb = new StringBuilder(); if (basePath != null && basePath.isEmpty() == false) { sb.append(basePath); } - sb.append(name()).append('/').append(blobKey); + sb.append(name).append('/').append(blobKey); return sb.toString(); } diff --git a/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java index e10a1f830aaed..8584a6956f70e 100644 --- a/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/DataFormatAwareStoreDirectoryFactory.java @@ -19,7 +19,6 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.List; import java.util.Map; /** @@ -86,7 +85,7 @@ default DataFormatAwareStoreDirectory newDataFormatAwareStoreDirectory( ShardPath shardPath, IndexStorePlugin.DirectoryFactory localDirectoryFactory, Map checksumStrategies, - List storeStrategies, + Map storeStrategies, NativeStoreRepository nativeStore, boolean isWarm, RemoteSegmentStoreDirectory remoteDirectory, diff --git a/server/src/main/java/org/opensearch/storage/directory/StoreStrategyRegistry.java b/server/src/main/java/org/opensearch/storage/directory/StoreStrategyRegistry.java index fdefe291afddf..18dd46e4839b5 100644 --- a/server/src/main/java/org/opensearch/storage/directory/StoreStrategyRegistry.java +++ b/server/src/main/java/org/opensearch/storage/directory/StoreStrategyRegistry.java @@ -52,16 +52,27 @@ public final class StoreStrategyRegistry implements Closeable { /** Sentinel for "no strategies registered on this shard". Safe to close. */ public static final StoreStrategyRegistry EMPTY = new StoreStrategyRegistry( - Collections.emptyList(), + Collections.emptyMap(), Collections.emptyMap() ); - private final List strategies; + /** Strategies keyed by format name. */ + private final Map strategies; /** Native registries keyed by format name. Absent for strategies without one. */ private final Map nativeRegistries; - private StoreStrategyRegistry(List strategies, Map nativeRegistries) { - this.strategies = List.copyOf(strategies); + /** + * A strategy paired with the format name it is registered under. Used + * internally for routing decisions so callers never need to re-derive the + * name from the strategy. + * + * @opensearch.experimental + */ + @ExperimentalApi + public record Match(String name, StoreStrategy strategy) {} + + private StoreStrategyRegistry(Map strategies, Map nativeRegistries) { + this.strategies = Map.copyOf(strategies); this.nativeRegistries = Map.copyOf(nativeRegistries); } @@ -76,7 +87,7 @@ private StoreStrategyRegistry(List strategies, Map strategies, + Map strategies, RemoteSegmentStoreDirectory remoteDirectory ) { if (strategies == null || strategies.isEmpty()) { @@ -95,14 +106,16 @@ public static StoreStrategyRegistry open( List created = new ArrayList<>(); boolean success = false; try { - for (StoreStrategy strategy : strategies) { + for (Map.Entry entry : strategies.entrySet()) { + String name = entry.getKey(); + StoreStrategy strategy = entry.getValue(); NativeFileRegistryFactory factory = strategy.nativeFileRegistry().orElse(null); if (factory == null) { continue; } NativeFileRegistry registry = factory.create(shardId, isWarm, nativeStore); if (registry != null) { - nativeRegistries.put(strategy.name(), registry); + nativeRegistries.put(name, registry); created.add(registry); } } @@ -121,15 +134,16 @@ public static StoreStrategyRegistry open( /** * Returns the strategy that owns {@code file}, or {@code null} if no - * registered strategy claims it. + * registered strategy claims it. The returned {@link Match} carries both + * the registered name and the strategy object. */ - public StoreStrategy strategyFor(String file) { + public Match matchFor(String file) { if (file == null) { return null; } - for (StoreStrategy strategy : strategies) { - if (strategy.owns(file)) { - return strategy; + for (Map.Entry entry : strategies.entrySet()) { + if (entry.getValue().owns(entry.getKey(), file)) { + return new Match(entry.getKey(), entry.getValue()); } } return null; @@ -153,15 +167,15 @@ public boolean hasNativeRegistries() { * registry */ public boolean onUploaded(String file, String basePath, String uploadedBlobKey) { - StoreStrategy strategy = strategyFor(file); - if (strategy == null) { + Match match = matchFor(file); + if (match == null) { return false; } - NativeFileRegistry registry = nativeRegistries.get(strategy.name()); + NativeFileRegistry registry = nativeRegistries.get(match.name()); if (registry == null) { return false; } - registry.onUploaded(file, strategy.remotePath(basePath, file, uploadedBlobKey)); + registry.onUploaded(file, match.strategy().remotePath(match.name(), basePath, file, uploadedBlobKey)); return true; } @@ -169,11 +183,11 @@ public boolean onUploaded(String file, String basePath, String uploadedBlobKey) * Forwards a removal event. Returns true if dispatched, false otherwise. */ public boolean onRemoved(String file) { - StoreStrategy strategy = strategyFor(file); - if (strategy == null) { + Match match = matchFor(file); + if (match == null) { return false; } - NativeFileRegistry registry = nativeRegistries.get(strategy.name()); + NativeFileRegistry registry = nativeRegistries.get(match.name()); if (registry == null) { return false; } @@ -187,7 +201,7 @@ public void close() throws IOException { } private static void seedFromRemoteMetadata( - List strategies, + Map strategies, Map nativeRegistries, RemoteSegmentStoreDirectory remoteDirectory ) { @@ -203,20 +217,22 @@ private static void seedFromRemoteMetadata( Map> perStrategy = new HashMap<>(); for (Map.Entry entry : uploaded.entrySet()) { String file = entry.getKey(); + String owningName = null; StoreStrategy owning = null; - for (StoreStrategy strategy : strategies) { - if (strategy.owns(file)) { - owning = strategy; + for (Map.Entry s : strategies.entrySet()) { + if (s.getValue().owns(s.getKey(), file)) { + owningName = s.getKey(); + owning = s.getValue(); break; } } - if (owning == null || nativeRegistries.containsKey(owning.name()) == false) { + if (owning == null || nativeRegistries.containsKey(owningName) == false) { continue; } String blobKey = entry.getValue().getUploadedFilename(); perStrategy - .computeIfAbsent(owning.name(), k -> new HashMap<>()) - .put(file, owning.remotePath(basePath, file, blobKey)); + .computeIfAbsent(owningName, k -> new HashMap<>()) + .put(file, owning.remotePath(owningName, basePath, file, blobKey)); } for (Map.Entry> entry : perStrategy.entrySet()) { diff --git a/server/src/main/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactory.java index 02424b1e6e252..2551d20371273 100644 --- a/server/src/main/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactory.java @@ -29,7 +29,6 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -71,7 +70,7 @@ public DataFormatAwareStoreDirectory newDataFormatAwareStoreDirectory( ShardPath shardPath, IndexStorePlugin.DirectoryFactory localDirectoryFactory, Map checksumStrategies, - List storeStrategies, + Map storeStrategies, NativeStoreRepository nativeStore, boolean isWarm, RemoteSegmentStoreDirectory remoteDirectory, diff --git a/server/src/main/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectory.java b/server/src/main/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectory.java index dfcacf86772f1..00e6f470aa0dd 100644 --- a/server/src/main/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectory.java +++ b/server/src/main/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectory.java @@ -160,8 +160,8 @@ private boolean isFormatFile(String name) { if (shardPath.resolveIndex().resolve(name).getParent().equals(shardPath.resolveIndex())) { return false; } - StoreStrategy strategy = strategies.strategyFor(name); - if (strategy == null) { + StoreStrategyRegistry.Match match = strategies.matchFor(name); + if (match == null) { throw new IllegalStateException( "No StoreStrategy registered for file [" + name + "]. Ensure the format plugin is installed." ); diff --git a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java index be18bf3785921..58d72c5fb1a89 100644 --- a/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatRegistryTests.java @@ -325,8 +325,8 @@ public void testGetStoreStrategiesEmptyWhenNoPluggableDataformat() { DataFormatRegistry registry = new DataFormatRegistry(pluginsService); - List result = registry.getStoreStrategies(indexSettings); - assertTrue("Should return empty list when no pluggable_dataformat setting", result.isEmpty()); + Map result = registry.getStoreStrategies(indexSettings); + assertTrue("Should return empty map when no pluggable_dataformat setting", result.isEmpty()); } public void testGetStoreStrategiesEmptyWhenPluginReturnsNone() { @@ -347,10 +347,10 @@ public void testGetStoreStrategiesEmptyWhenPluginReturnsNone() { .build(); IndexSettings settingsWithFormat = new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings); - // MockDataFormatPlugin does not override getStoreStrategy, so the default returns null - // and getStoreStrategies returns an empty list. - List result = registry.getStoreStrategies(settingsWithFormat); - assertTrue("Should return empty list when plugin provides no strategy", result.isEmpty()); + // MockDataFormatPlugin does not override getStoreStrategies, so the default returns + // an empty map. + Map result = registry.getStoreStrategies(settingsWithFormat); + assertTrue("Should return empty map when plugin provides no strategy", result.isEmpty()); } public void testGetStoreStrategiesEmptyWhenFormatNameNotRegistered() { @@ -371,8 +371,8 @@ public void testGetStoreStrategiesEmptyWhenFormatNameNotRegistered() { .build(); IndexSettings settingsWithFormat = new IndexSettings(IndexMetadata.builder("index").settings(settings).build(), settings); - List result = registry.getStoreStrategies(settingsWithFormat); - assertTrue("Should return empty list when format name not registered", result.isEmpty()); + Map result = registry.getStoreStrategies(settingsWithFormat); + assertTrue("Should return empty map when format name not registered", result.isEmpty()); } public void testGetPluginReturnsPluginForRegisteredFormat() { diff --git a/server/src/test/java/org/opensearch/storage/directory/GracefulDegradationTests.java b/server/src/test/java/org/opensearch/storage/directory/GracefulDegradationTests.java index 6b93c7af1e521..dfd20456ea6cb 100644 --- a/server/src/test/java/org/opensearch/storage/directory/GracefulDegradationTests.java +++ b/server/src/test/java/org/opensearch/storage/directory/GracefulDegradationTests.java @@ -106,7 +106,7 @@ public void testNoFormatPluginsCreatesValidStack() throws IOException { shardPath, localDirFactory, Map.of(), - java.util.List.of(), + java.util.Map.of(), org.opensearch.repositories.NativeStoreRepository.EMPTY, true, remoteDir, diff --git a/server/src/test/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactoryTests.java index ea33580212dac..b6f546ea1b4ff 100644 --- a/server/src/test/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/storage/directory/TieredDataFormatAwareStoreDirectoryFactoryTests.java @@ -131,7 +131,7 @@ public void testCreatesCorrectDirectoryStack() throws IOException { shardPath, localDirectoryFactory, Map.of(), - java.util.List.of(), + java.util.Map.of(), org.opensearch.repositories.NativeStoreRepository.EMPTY, true, remoteDirectory, @@ -164,7 +164,7 @@ public void testNoDoubleSubdirectoryAwareDirectoryWrapping() throws IOException shardPath, localDirectoryFactory, Map.of(), - java.util.List.of(), + java.util.Map.of(), org.opensearch.repositories.NativeStoreRepository.EMPTY, true, remoteDirectory, @@ -197,7 +197,7 @@ public void testEmptyFormatDirectoriesWhenNoPluginProvides() throws IOException shardPath, localDirectoryFactory, Map.of(), - java.util.List.of(), + java.util.Map.of(), org.opensearch.repositories.NativeStoreRepository.EMPTY, true, remoteDirectory, diff --git a/server/src/test/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectoryTests.java b/server/src/test/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectoryTests.java index b6bdc757456b1..1500ab9a0263a 100644 --- a/server/src/test/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectoryTests.java +++ b/server/src/test/java/org/opensearch/storage/directory/TieredSubdirectoryAwareDirectoryTests.java @@ -38,7 +38,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.HashSet; -import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; @@ -136,7 +136,7 @@ private WithRegistry buildDirectoryWithParquetFormat(NativeFileRegistry nativeRe shardPath.getShardId(), true, NativeStoreRepository.EMPTY, - List.of(parquet), + Map.of("parquet", parquet), remoteSegmentStoreDirectory ); TieredSubdirectoryAwareDirectory dir = new TieredSubdirectoryAwareDirectory( @@ -460,7 +460,7 @@ public void testConstructorFailureClosesStrategyRegistry() throws IOException { shardPath.getShardId(), true, NativeStoreRepository.EMPTY, - List.of(parquet), + Map.of("parquet", parquet), remoteSegmentStoreDirectory ); @@ -586,11 +586,6 @@ private static final class TestParquetStrategy implements StoreStrategy { this.factory = factory; } - @Override - public String name() { - return "parquet"; - } - @Override public Optional nativeFileRegistry() { return Optional.of(factory); diff --git a/server/src/test/java/org/opensearch/storage/directory/WarmShardDirectoryStackTests.java b/server/src/test/java/org/opensearch/storage/directory/WarmShardDirectoryStackTests.java index cd7007a6123a3..d6bf47f7bfe8f 100644 --- a/server/src/test/java/org/opensearch/storage/directory/WarmShardDirectoryStackTests.java +++ b/server/src/test/java/org/opensearch/storage/directory/WarmShardDirectoryStackTests.java @@ -43,7 +43,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; @@ -127,7 +127,7 @@ public void testWarmDirectoryStackCreationAndWrite() throws IOException { shardPath, localDirFactory, java.util.Map.of(), - List.of(), // no strategies + java.util.Map.of(), // no strategies NativeStoreRepository.EMPTY, true, remoteDir, @@ -161,11 +161,6 @@ public void testWarmDirectoryStackWithFormatStrategy() throws IOException { NativeFileRegistry nativeRegistry = mock(NativeFileRegistry.class); NativeFileRegistryFactory factory = (sid, isWarm, repo) -> nativeRegistry; StoreStrategy parquet = new StoreStrategy() { - @Override - public String name() { - return "parquet"; - } - @Override public Optional nativeFileRegistry() { return Optional.of(factory); @@ -176,7 +171,7 @@ public Optional nativeFileRegistry() { shardPath.getShardId(), true, NativeStoreRepository.EMPTY, - List.of(parquet), + Map.of("parquet", parquet), remoteDir );