Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -127,39 +126,25 @@ public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(

/**
* Returns the store strategies from every participating sub-format plugin
* (primary + secondary). Sub-plugins that do not implement
* {@link DataFormatPlugin#getStoreStrategy()} contribute nothing.
* (primary + secondary), keyed by format name. Mirrors {@link #getFormatDescriptors}:
* each participating format is resolved through the registry, which delegates
* to the sub-plugin without re-entering this composite.
*/
@Override
public List<StoreStrategy> getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
public Map<String, StoreStrategy> getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
Settings settings = indexSettings.getSettings();
String primaryFormatName = PRIMARY_DATA_FORMAT.get(settings);
List<String> secondaryFormatNames = SECONDARY_DATA_FORMATS.get(settings);

List<StoreStrategy> out = new ArrayList<>();
collectStrategy(dataFormatRegistry, primaryFormatName, out, indexSettings);
for (String secondary : secondaryFormatNames) {
collectStrategy(dataFormatRegistry, secondary, out, indexSettings);
Map<String, StoreStrategy> strategies = new HashMap<>();
if (primaryFormatName != null && primaryFormatName.isEmpty() == false) {
strategies.putAll(dataFormatRegistry.getStoreStrategies(indexSettings, dataFormatRegistry.format(primaryFormatName)));
}
return List.copyOf(out);
}

private static void collectStrategy(
DataFormatRegistry registry,
String formatName,
List<StoreStrategy> out,
IndexSettings indexSettings
) {
if (formatName == null || formatName.isEmpty()) {
return;
}
DataFormatPlugin plugin = registry.getPlugin(formatName);
if (plugin == null) {
return;
}
List<StoreStrategy> contributed = plugin.getStoreStrategies(indexSettings, registry);
if (contributed != null) {
out.addAll(contributed);
for (String secondaryName : secondaryFormatNames) {
if (secondaryName != null && secondaryName.isEmpty() == false) {
strategies.putAll(dataFormatRegistry.getStoreStrategies(indexSettings, dataFormatRegistry.format(secondaryName)));
}
}
return Map.copyOf(strategies);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
import org.opensearch.index.engine.dataformat.StoreStrategy;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -95,9 +94,12 @@ public void testGetStoreStrategiesEmptyWhenNoSubPlugins() {
DataFormatRegistry registry = mock(DataFormatRegistry.class);

IndexSettings indexSettings = buildIndexSettings(Settings.builder().put("index.composite.primary_data_format", "parquet").build());
when(registry.getPlugin("parquet")).thenReturn(null);
// Registry resolves the format but the composite pipeline finds no plugin → empty map.
DataFormat parquetFormat = CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of());
when(registry.format("parquet")).thenReturn(parquetFormat);
when(registry.getStoreStrategies(indexSettings, parquetFormat)).thenReturn(Map.of());

List<StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
Map<String, StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
assertTrue("Should return empty when no sub-plugin found", result.isEmpty());
}

Expand All @@ -107,15 +109,14 @@ public void testGetStoreStrategiesCollectsFromPrimaryPlugin() {

IndexSettings indexSettings = buildIndexSettings(Settings.builder().put("index.composite.primary_data_format", "parquet").build());

DataFormatPlugin subPlugin = mock(DataFormatPlugin.class);
DataFormat parquetFormat = CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of());
StoreStrategy parquetStrategy = mock(StoreStrategy.class);
when(parquetStrategy.name()).thenReturn("parquet");
when(subPlugin.getStoreStrategies(indexSettings, registry)).thenReturn(List.of(parquetStrategy));
when(registry.getPlugin("parquet")).thenReturn(subPlugin);
when(registry.format("parquet")).thenReturn(parquetFormat);
when(registry.getStoreStrategies(indexSettings, parquetFormat)).thenReturn(Map.of("parquet", parquetStrategy));

List<StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
Map<String, StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
assertEquals(1, result.size());
assertSame(parquetStrategy, result.get(0));
assertSame(parquetStrategy, result.get("parquet"));
}

public void testGetStoreStrategiesCollectsPrimaryAndSecondary() {
Expand All @@ -129,30 +130,30 @@ public void testGetStoreStrategiesCollectsPrimaryAndSecondary() {
.build()
);

DataFormatPlugin luceneSubPlugin = mock(DataFormatPlugin.class);
when(luceneSubPlugin.getStoreStrategies(indexSettings, registry)).thenReturn(List.of());

DataFormatPlugin parquetSubPlugin = mock(DataFormatPlugin.class);
DataFormat luceneFormat = CompositeTestHelper.stubFormat("lucene", 1, java.util.Set.of());
DataFormat parquetFormat = CompositeTestHelper.stubFormat("parquet", 2, java.util.Set.of());
StoreStrategy parquetStrategy = mock(StoreStrategy.class);
when(parquetStrategy.name()).thenReturn("parquet");
when(parquetSubPlugin.getStoreStrategies(indexSettings, registry)).thenReturn(List.of(parquetStrategy));

when(registry.getPlugin("lucene")).thenReturn(luceneSubPlugin);
when(registry.getPlugin("parquet")).thenReturn(parquetSubPlugin);
when(registry.format("lucene")).thenReturn(luceneFormat);
when(registry.format("parquet")).thenReturn(parquetFormat);
when(registry.getStoreStrategies(indexSettings, luceneFormat)).thenReturn(Map.of());
when(registry.getStoreStrategies(indexSettings, parquetFormat)).thenReturn(Map.of("parquet", parquetStrategy));

List<StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
Map<String, StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
assertEquals(1, result.size());
assertSame(parquetStrategy, result.get(0));
assertSame(parquetStrategy, result.get("parquet"));
}

public void testGetStoreStrategiesEmptyForDefaultPrimaryWithoutPlugin() {
CompositeDataFormatPlugin plugin = new CompositeDataFormatPlugin();
DataFormatRegistry registry = mock(DataFormatRegistry.class);

IndexSettings indexSettings = buildIndexSettings(Settings.EMPTY); // defaults: primary=lucene, secondary=[]
when(registry.getPlugin("lucene")).thenReturn(null);
DataFormat luceneFormat = CompositeTestHelper.stubFormat("lucene", 1, java.util.Set.of());
when(registry.format("lucene")).thenReturn(luceneFormat);
when(registry.getStoreStrategies(indexSettings, luceneFormat)).thenReturn(Map.of());

List<StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
Map<String, StoreStrategy> result = plugin.getStoreStrategies(indexSettings, registry);
assertTrue("Should return empty when lucene sub-plugin not found", result.isEmpty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* {@link ParquetIndexingEngine} instances created in {@link #indexingEngine}.
*
* <p>For tiered storage, returns a {@link ParquetStoreStrategy} from
* {@link #getStoreStrategy()}. The composite store layer takes it from there —
* {@link #getStoreStrategies}. The composite store layer takes it from there —
* construction of per-shard native registries, seeding from remote metadata,
* routing directory events, and closing native resources are all handled
* there. The plugin stays purely declarative.
Expand Down Expand Up @@ -120,8 +120,8 @@ public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(IndexSet
}

@Override
public StoreStrategy getStoreStrategy() {
return storeStrategy;
public Map<String, StoreStrategy> getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry registry) {
return Map.of(ParquetDataFormat.PARQUET_DATA_FORMAT_NAME, storeStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.parquet.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.dataformat.NativeFileRegistry;
import org.opensearch.plugins.NativeStoreHandle;
import org.opensearch.repositories.NativeStoreRepository;

import java.io.IOException;
import java.util.Map;

/**
* Per-shard native file registry for the parquet format.
*
* <p>Wraps the Rust {@code TieredObjectStore} pointer that the parquet native
* reader consults to resolve file identifiers to blob paths. The store layer
* owns the lifecycle — this class is just the FFM boundary.
*
* <p>Read-only warm (current): every parquet file is REMOTE; the Java directory
* always reads from the remote store. The registry still receives
* seed/upload/remove events so the Rust {@code FileRegistry} stays consistent
* with the Java view.
*/
public final class ParquetNativeFileRegistry implements NativeFileRegistry {

private static final Logger logger = LogManager.getLogger(ParquetNativeFileRegistry.class);

private final ShardId shardId;
private NativeStoreHandle storeHandle = NativeStoreHandle.EMPTY;

public ParquetNativeFileRegistry(ShardId shardId, boolean isWarm, NativeStoreRepository repoStore) {
this.shardId = shardId;
if (isWarm && repoStore != null && repoStore.isLive()) {
// TODO: FFM — storePtr = ts_create_tiered_object_store(localPath)
// TODO: FFM — ts_set_remote_store(storePtr, repoStore.getPointer())
// this.storeHandle = new NativeStoreHandle(storePtr, TieredStorageNative::destroy);
logger.debug("Created ParquetNativeFileRegistry for shard [{}]", shardId);
}
}

@Override
public void seed(Map<String, String> fileToRemotePath) {
if (storeHandle.isLive() == false) {
return;
}
// TODO: FFM — ts_seed_file_locations(storeHandle.getPointer(), fileToRemotePath)
logger.trace("seed: shard=[{}], count={}", shardId, fileToRemotePath.size());
}

@Override
public void onUploaded(String file, String remotePath) {
if (storeHandle.isLive() == false) {
return;
}
// TODO: FFM — ts_after_sync_to_remote(storeHandle.getPointer(), file, remotePath)
logger.trace("onUploaded: shard=[{}], file=[{}], remotePath=[{}]", shardId, file, remotePath);
}

@Override
public void onRemoved(String file) {
if (storeHandle.isLive() == false) {
return;
}
// TODO: FFM — ts_remove_file(storeHandle.getPointer(), file)
logger.trace("onRemoved: shard=[{}], file=[{}]", shardId, file);
}

@Override
public void close() throws IOException {
storeHandle.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,25 @@

import org.opensearch.index.engine.dataformat.NativeFileRegistryFactory;
import org.opensearch.index.engine.dataformat.StoreStrategy;
import org.opensearch.parquet.engine.ParquetDataFormat;

import java.util.Optional;

/**
* Store strategy for the parquet data format.
*
* <p>Declares the parquet file-naming convention (files live under the
* {@code "parquet/"} subdirectory) and provides a factory for the per-shard
* native file registry that tracks parquet files for the Rust reader.
* <p>Uses the default {@code owns} / {@code remotePath} behaviour inherited
* from {@link StoreStrategy} (files live under {@code "parquet/"} prefix, blobs
* are laid out at {@code basePath + "parquet/" + blobKey}). The store layer
* supplies the format name when it invokes those methods, so the strategy
* itself does not carry the name.
*
* <p>All other behaviour (per-shard construction, remote-metadata seeding,
* directory routing, close ordering) is handled by the store layer. This
* class is a pure declaration.
* <p>Provides a factory for the per-shard native file registry that tracks
* parquet files for the Rust reader.
*/
public final class ParquetStoreStrategy implements StoreStrategy {

private static final NativeFileRegistryFactory FACTORY = ParquetNativeFileRegistry::new;

@Override
public String name() {
return ParquetDataFormat.PARQUET_DATA_FORMAT_NAME;
}

@Override
public Optional<NativeFileRegistryFactory> nativeFileRegistry() {
return Optional.of(FACTORY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ protected void closeInternal() {
&& this.dataFormatAwareStoreDirectoryFactory != null) {
// Warm + format-aware: resolve per-shard store strategies and native store,
// then let the factory build the StoreStrategyRegistry and directory stack.
List<StoreStrategy> storeStrategies = dataFormatRegistry.getStoreStrategies(
Map<String, StoreStrategy> storeStrategies = dataFormatRegistry.getStoreStrategies(
this.indexSettings
);
NativeStoreRepository nativeStore = resolveNativeStore(repositoriesService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;

import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

Expand All @@ -25,7 +24,7 @@
* <li>{@link DataFormatDescriptor} via {@link #getFormatDescriptors} —
* <b>describes</b> the format (name, checksum strategy, static
* capabilities). Per-index value data.</li>
* <li>{@link StoreStrategy} via {@link #getStoreStrategy} —
* <li>{@link StoreStrategy} via {@link #getStoreStrategies} —
* <b>behavior</b> for how the format participates in the tiered store
* (file ownership, remote layout, optional native registry).</li>
* </ul>
Expand Down Expand Up @@ -65,35 +64,22 @@ default Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(
}

/**
* Behavior describing how this format participates in the tiered store.
* Returns the strategies describing how this format participates in the tiered store,
* keyed by the format name the strategy applies to.
*
* <p>Plugins that need tiered-store support return a non-null strategy;
* all other plumbing (per-shard lifecycle, seeding, routing, close) is
* handled by the store layer.
* <p>Most plugins contribute a single entry (their own format). Composite plugins,
* which expose multiple formats per index, return one entry per participating format.
* A plugin that does not participate in the tiered store returns an empty map (default).
*
* @return a strategy for this format, or {@code null} if the format does
* not participate in the tiered store
*/
default StoreStrategy getStoreStrategy() {
return null;
}

/**
* Returns all strategies that apply for this plugin on the given index.
*
* <p>The default returns a singleton list containing
* {@link #getStoreStrategy()} when it is non-null, otherwise an empty
* list. Composite plugins (which expose multiple formats per index)
* override this to contribute the strategies for every participating
* format.
* <p>All cross-cutting work (per-shard lifecycle, seeding, routing, close) is handled
* by the store layer. Plugins only declare strategies here.
*
* @param indexSettings the index settings
* @param dataFormatRegistry the registry, used by composite plugins to
* resolve sub-format plugins
* @return the strategies that apply; never {@code null}
* @param dataFormatRegistry the registry, used by composite plugins to resolve
* sub-format plugins
* @return the strategies that apply, keyed by format name; never {@code null}
*/
default List<StoreStrategy> getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
StoreStrategy s = getStoreStrategy();
return s == null ? List.of() : List.of(s);
default Map<String, StoreStrategy> getStoreStrategies(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
return Map.of();
}
}
Loading
Loading