From d50478aaa048119c9f456030e022d3776b213d6f Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Fri, 17 Apr 2026 17:36:05 +0200 Subject: [PATCH] CASSSIDECAR-443: Support column types not parseable by Java 3.x driver --- ...assandraNodeOperationsIntegrationTest.java | 2 +- ...andraVectorSchemaRouteIntegrationTest.java | 70 +++++ .../server/data/QualifiedTableName.java | 21 ++ .../bridge/CassandraBridgeFactory.java | 1 - .../sidecar/config/DriverConfiguration.java | 8 + .../config/yaml/DriverConfigurationImpl.java | 23 +- .../sidecar/db/CQLSchemaAccessor.java | 98 +++++++ .../db/DriverUnsupportedSchemaCache.java | 277 ++++++++++++++++++ .../handlers/KeyspaceSchemaHandler.java | 17 +- .../sidecar/handlers/SchemaHandler.java | 7 +- .../sstableuploads/SSTableUploadHandler.java | 37 ++- .../ValidateTableExistenceHandler.java | 17 +- .../cassandra/sidecar/modules/CdcModule.java | 4 +- .../sidecar/modules/ConfigurationModule.java | 25 ++ .../multibindings/PeriodicTaskMapKeys.java | 1 + .../tasks/CassandraClusterSchemaMonitor.java | 12 +- .../testing/IntegrationTestModule.java | 9 +- .../apache/cassandra/sidecar/TestModule.java | 11 + .../sidecar/db/CQLSchemaAccessorTest.java | 107 +++++++ .../db/DriverUnsupportedSchemaCacheTest.java | 128 ++++++++ .../sidecar/handlers/SchemaHandlerTest.java | 12 + .../CassandraClusterSchemaMonitorTest.java | 10 +- spotbugs-exclude.xml | 4 + 23 files changed, 868 insertions(+), 33 deletions(-) create mode 100644 integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessor.java create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java create mode 100644 server/src/test/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessorTest.java create mode 100644 server/src/test/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCacheTest.java diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java index 5566342d6..35e96b897 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java @@ -76,7 +76,7 @@ void testNodeDrainOperationSuccess() trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_DRAIN_ROUTE) .send()); - assertThat(drainResponse.statusCode()).isEqualTo(OK.code()); + assertThat(drainResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); JsonObject responseBody = drainResponse.bodyAsJsonObject(); assertThat(responseBody).isNotNull(); diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java new file mode 100644 index 000000000..e759ef67e --- /dev/null +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraVectorSchemaRouteIntegrationTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.http.HttpResponseExpectation; +import org.apache.cassandra.sidecar.common.response.SchemaResponse; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; +import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; + +import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +class CassandraVectorSchemaRouteIntegrationTest extends SharedClusterSidecarIntegrationTestBase +{ + protected static final int MIN_VERSION_WITH_VECTOR = 5; + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace("test_keyspace", Map.of("replication_factor", 1)); + createTestTable(new QualifiedName("test_keyspace", "int_table"), + "CREATE TABLE IF NOT EXISTS %s (a int, b int, PRIMARY KEY (a))"); + createTestTable(new QualifiedName("test_keyspace", "vector_table"), + "CREATE TABLE IF NOT EXISTS %s (a int, b vector, PRIMARY KEY (a))"); + } + + @Override + protected void beforeClusterProvisioning() + { + assumeThat(SimpleCassandraVersion.create(testVersion.version()).major) + .as("Vector type is supported since Cassandra 5.0") + .isGreaterThanOrEqualTo(MIN_VERSION_WITH_VECTOR); + } + + @Test + void testSchemaHandlerWithVectorTable() + { + String testRoute = "/api/v1/schema/keyspaces/test_keyspace"; + SchemaResponse response = getBlocking(trustedClient() + .get(serverWrapper.serverPort, "localhost", testRoute) + .send() + .expecting(HttpResponseExpectation.SC_OK)) + .bodyAsJson(SchemaResponse.class); + assertThat(response).isNotNull(); + assertThat(response.keyspace()).isEqualTo("test_keyspace"); + assertThat(response.schema()).contains("vector_table"); + } +} diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java index 35379035b..ee504fbc2 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/data/QualifiedTableName.java @@ -117,6 +117,27 @@ public String maybeQuotedTableName() return table; } + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (!(o instanceof QualifiedTableName)) + { + return false; + } + QualifiedTableName that = (QualifiedTableName) o; + return Objects.equals(keyspace, that.keyspace) && Objects.equals(table, that.table); + } + + @Override + public int hashCode() + { + return Objects.hash(keyspace, table); + } + /** * {@inheritDoc} */ diff --git a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java index 466bc4286..9c99430a5 100644 --- a/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java +++ b/server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java @@ -30,7 +30,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; - import jakarta.inject.Singleton; import org.jetbrains.annotations.NotNull; diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java index 84b60f66c..412a82c1f 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java @@ -21,6 +21,8 @@ import java.net.InetSocketAddress; import java.util.List; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; + /** * The driver configuration to use when connecting to Cassandra */ @@ -61,4 +63,10 @@ public interface DriverConfiguration * Cassandra instance. */ SslConfiguration sslConfiguration(); + + /** + * @return Refresh interval of table schemas not supported by Java driver's metadata + * (not parseable, e.g. including vector type). + */ + SecondBoundConfiguration unsupportedTableSchemaRefreshTime(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java index 8755cc00b..b4e1aafc2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java @@ -21,8 +21,10 @@ import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; import org.apache.cassandra.sidecar.config.DriverConfiguration; import org.apache.cassandra.sidecar.config.SslConfiguration; @@ -32,6 +34,8 @@ public class DriverConfigurationImpl implements DriverConfiguration { private static final int DEFAULT_NUM_CONNECTIONS = 1000; + private static final SecondBoundConfiguration DEFAULT_UNSUPPORTED_TABLE_SCHEMA_REFRESH_TIME = new SecondBoundConfiguration(5, TimeUnit.MINUTES); + @JsonProperty("contact_points") private final List contactPoints; @@ -50,9 +54,12 @@ public class DriverConfigurationImpl implements DriverConfiguration @JsonProperty("ssl") private final SslConfiguration sslConfiguration; + @JsonProperty("unsupported_table_schema_refresh_time") + private final SecondBoundConfiguration unsupportedTableSchemaRefreshTime; + public DriverConfigurationImpl() { - this(Collections.emptyList(), null, DEFAULT_NUM_CONNECTIONS, null, null, null); + this(Collections.emptyList(), null, DEFAULT_NUM_CONNECTIONS, null, null, null, DEFAULT_UNSUPPORTED_TABLE_SCHEMA_REFRESH_TIME); } public DriverConfigurationImpl(List contactPoints, @@ -60,7 +67,8 @@ public DriverConfigurationImpl(List contactPoints, int numConnections, String username, String password, - SslConfiguration sslConfiguration) + SslConfiguration sslConfiguration, + SecondBoundConfiguration unsupportedTableSchemaRefreshTime) { this.contactPoints = contactPoints; this.localDc = localDc; @@ -68,6 +76,7 @@ public DriverConfigurationImpl(List contactPoints, this.username = username; this.password = password; this.sslConfiguration = sslConfiguration; + this.unsupportedTableSchemaRefreshTime = unsupportedTableSchemaRefreshTime; } /** @@ -129,4 +138,14 @@ public SslConfiguration sslConfiguration() { return sslConfiguration; } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty("unsupported_table_schema_refresh_time") + public SecondBoundConfiguration unsupportedTableSchemaRefreshTime() + { + return unsupportedTableSchemaRefreshTime; + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessor.java b/server/src/main/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessor.java new file mode 100644 index 000000000..4b36cbf00 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.data.Name; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Component allowing to read CQL schema by executing {@code DESCRIBE} statement. + * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421). + */ +@Singleton +public class CQLSchemaAccessor +{ + private final CQLSessionProvider sessionProvider; + + public CQLSchemaAccessor(CQLSessionProvider sessionProvider) + { + this.sessionProvider = sessionProvider; + } + + @NotNull + public Set getKeyspaces() + { + Session session = sessionProvider.get(); + Set keyspaces = new HashSet<>(); + List rows = session.execute("DESCRIBE KEYSPACES").all(); + for (Row row : rows) + { + Name keyspaceName = new Name(row.getString("keyspace_name")); + keyspaces.add(keyspaceName); + } + return keyspaces; + } + + @Nullable + public List getKeyspaceSchema(@NotNull Name keyspace) + { + Session session = sessionProvider.get(); + String statement = String.format("DESCRIBE KEYSPACE %s", keyspace.maybeQuotedName()); + return describe(session, statement); + } + + @Nullable + public List getTableSchema(@NotNull Name keyspace, @NotNull Name table) + { + Session session = sessionProvider.get(); + String statement = String.format("DESCRIBE TABLE %s.%s", keyspace.maybeQuotedName(), table.maybeQuotedName()); + return describe(session, statement); + } + + private List describe(Session session, String describeStatement) + { + try + { + List rows = session.execute(describeStatement).all(); + List result = new ArrayList<>(rows.size()); + for (Row row : rows) + { + String createStatement = row.getString("create_statement"); + result.add(createStatement); + } + return result; + } + catch (InvalidQueryException e) + { + // keyspace or table not found + return null; + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java b/server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java new file mode 100644 index 000000000..8e5d3bb48 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.google.inject.Singleton; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.data.Name; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; + +/** + * The {@link DriverUnsupportedSchemaCache} class maintains cache of CQL schema for tables whose definition is not + * supported by driver natively. Java driver 3.x does not return {@link TableMetadata} for tables which schema + * could not be parsed. Since it does not currently support vector type, any table containing vector would not + * be visible. Cache is refreshed for the first time as soon as CQL connection is established or upon first lookup. + * Later, it is periodically refreshed according to configured schedule. + * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421). + */ +@Singleton +public class DriverUnsupportedSchemaCache implements PeriodicTask +{ + private static final Logger LOGGER = LoggerFactory.getLogger(DriverUnsupportedSchemaCache.class); + private static final String STATEMENT_DELIMITER = "\n\n"; + + private final SidecarConfiguration sidecarConfiguration; + private final CQLSessionProvider sessionProvider; + private final CQLSchemaAccessor schemaAccessor; + + // cache contains only schemas unparseable by Java driver + // during every cache refresh, reference is being reassigned to achieve atomic swap + private volatile SortedMap schemaCache; + private volatile boolean initialized; // flag indicating whether cache has been populated at least once + + private PreparedStatement tableListStatement; + + public DriverUnsupportedSchemaCache(SidecarConfiguration sidecarConfiguration, + CQLSessionProvider sessionProvider) + { + this.sidecarConfiguration = sidecarConfiguration; + this.sessionProvider = sessionProvider; + this.schemaAccessor = new CQLSchemaAccessor(sessionProvider); + this.schemaCache = createCache(); + this.initialized = false; + } + + /** + * @return Schema for all tables across all keyspaces (only those not supported by Java driver). + */ + @NotNull + public String getFullSchema() + { + // we cannot know whether schema was updated after last + // periodic refresh, so results may be stale + refreshIfUninitialized(); + return getUnsupportedSchema(table -> true); + } + + /** + * @return Schema for all tables within given keyspaces (only those not supported by Java driver). + */ + @NotNull + public String getKeyspaceSchema(@NotNull Name keyspace) + { + // we cannot know whether schema was updated after last + // periodic refresh, so results may be stale + refreshIfUninitialized(); + return getUnsupportedSchema(table -> keyspace.equals(table.getKeyspace())); + } + + /** + * @return Schema for table not supported by Java driver's metadata, {@code null} otherwise. + */ + @Nullable + public String getTableSchema(@NotNull Name keyspace, @NotNull Name table) + { + return getTableSchema(keyspace, table, true); + } + + /** + * @param allowRefresh Flag indicating whether lookup of table schema via CQL query + * is allowed, when data not found in cache. + * @return Schema for table not supported by Java driver's metadata, {@code null} otherwise. + */ + @Nullable + public String getTableSchema(@NotNull Name keyspace, @NotNull Name table, boolean allowRefresh) + { + refreshIfUninitialized(); + QualifiedTableName name = new QualifiedTableName(keyspace, table); + String schema = schemaCache.get(name); + if (schema == null && allowRefresh) + { + // proactively fetch table schema, not to provide + // false-negative when table is not cached yet + // attention, method can block + schema = populateSchemaCache(schemaCache, name); + } + return schema; + } + + @Override + public DurationSpec delay() + { + return sidecarConfiguration.driverConfiguration().unsupportedTableSchemaRefreshTime(); + } + + @Override + public void execute(Promise promise) + { + try + { + refresh(false); + promise.tryComplete(); + } + catch (Throwable t) + { + promise.fail(t); + } + } + + private String getUnsupportedSchema(Predicate condition) + { + StringBuilder result = new StringBuilder(); + for (Map.Entry entry : schemaCache.entrySet()) + { + if (condition.test(entry.getKey())) + { + result.append(entry.getValue()).append(STATEMENT_DELIMITER); + } + } + return result.toString().trim(); + } + + private void refreshIfUninitialized() + { + if (!initialized) + { + refresh(true); + } + } + + public synchronized void refresh(boolean initializeOnly) + { + if (initialized && initializeOnly) + { + // cache has been already initialized, early exit + return; + } + try + { + Session session = sessionProvider.get(); + prepareStatements(session); + + Set tables = queryAllTables(session); + Set driverKnownTables = driverKnownTables(session); + + tables.removeAll(driverKnownTables); + + if (!tables.isEmpty()) + { + LOGGER.debug("Tables not supported by Java driver metadata: {}", tables); + SortedMap newCache = createCache(); + tables.forEach(table -> populateSchemaCache(newCache, table)); + // replacing cache, because some tables might have been removed in the meanwhile + schemaCache = newCache; + } + else + { + schemaCache.clear(); + } + + initialized = true; + } + catch (CassandraUnavailableException ignored) + { + LOGGER.debug("Not yet connect to Cassandra cluster"); + } + } + + private String populateSchemaCache(Map cache, QualifiedTableName table) + { + List cqlSchema = schemaAccessor.getTableSchema(table.getKeyspace(), table.table()); + if (cqlSchema != null) + { + String schema = String.join(STATEMENT_DELIMITER, cqlSchema); + cache.put(table, schema); + return schema; + } + return null; + } + + private Set queryAllTables(Session session) + { + List rows = session.execute(tableListStatement.bind()).all(); + return rows.stream() + .map(r -> new QualifiedTableName(r.getString("keyspace_name"), + r.getString("table_name"))) + .collect(Collectors.toSet()); + } + + private Set driverKnownTables(Session session) + { + Set result = new HashSet<>(); + Cluster cluster = session.getCluster(); + for (KeyspaceMetadata keyspace : cluster.getMetadata().getKeyspaces()) + { + for (TableMetadata table : keyspace.getTables()) + { + result.add(new QualifiedTableName(keyspace.getName(), table.getName())); + } + } + return result; + } + + private void prepareStatements(Session session) + { + if (tableListStatement == null) + { + tableListStatement = session.prepare("SELECT keyspace_name, table_name FROM system_schema.tables"); + } + } + + private SortedMap createCache() + { + // use sorted map for repeatable results when retrieving full schema + // synchronized, because different threads may indirectly add items to the map using getTableSchema() method + return Collections.synchronizedSortedMap(new TreeMap<>(Comparator.comparing(QualifiedTableName::toString))); + } + + @VisibleForTesting + void setInitialized(boolean initialized) + { + this.initialized = initialized; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java index c9446fe69..824f8af40 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java @@ -34,6 +34,7 @@ import org.apache.cassandra.sidecar.common.response.SchemaResponse; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.sidecar.utils.MetadataUtils; @@ -47,19 +48,24 @@ @Singleton public class KeyspaceSchemaHandler extends AbstractHandler implements AccessProtected { + private final DriverUnsupportedSchemaCache driverUnsupportedSchemaCache; + /** * Constructs a handler with the provided {@code metadataFetcher} * * @param metadataFetcher the interface to retrieve metadata * @param executorPools executor pools for blocking executions * @param validator a validator instance to validate Cassandra-specific input + * @param driverUnsupportedSchemaCache cache of unparseable table schemas by Java driver */ @Inject protected KeyspaceSchemaHandler(InstanceMetadataFetcher metadataFetcher, ExecutorPools executorPools, - CassandraInputValidator validator) + CassandraInputValidator validator, + DriverUnsupportedSchemaCache driverUnsupportedSchemaCache) { super(metadataFetcher, executorPools, validator); + this.driverUnsupportedSchemaCache = driverUnsupportedSchemaCache; } @Override @@ -94,7 +100,9 @@ private void handleWithMetadata(RoutingContext context, Name keyspace, Metadata { if (keyspace == null) { - SchemaResponse schemaResponse = new SchemaResponse(metadata.exportSchemaAsString()); + String fullSchema = metadata.exportSchemaAsString(); + fullSchema += driverUnsupportedSchemaCache.getFullSchema(); + SchemaResponse schemaResponse = new SchemaResponse(fullSchema); context.json(schemaResponse); return; } @@ -111,8 +119,9 @@ private void handleWithMetadata(RoutingContext context, Name keyspace, Metadata return; } - SchemaResponse schemaResponse = new SchemaResponse(keyspace.name(), - ksMetadata.exportAsString()); + String keyspaceSchema = ksMetadata.exportAsString(); + keyspaceSchema += driverUnsupportedSchemaCache.getKeyspaceSchema(keyspace); + SchemaResponse schemaResponse = new SchemaResponse(keyspace.name(), keyspaceSchema); context.json(schemaResponse); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java index ec3f65fa3..351115b80 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/SchemaHandler.java @@ -27,6 +27,7 @@ import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; @@ -42,13 +43,15 @@ public class SchemaHandler extends KeyspaceSchemaHandler * @param metadataFetcher the interface to retrieve metadata * @param executorPools executor pools for blocking executions * @param validator a validator instance to validate Cassandra-specific input + * @param driverUnsupportedSchemaCache cache of unparseable table schemas by Java driver */ @Inject protected SchemaHandler(InstanceMetadataFetcher metadataFetcher, ExecutorPools executorPools, - CassandraInputValidator validator) + CassandraInputValidator validator, + DriverUnsupportedSchemaCache driverUnsupportedSchemaCache) { - super(metadataFetcher, executorPools, validator); + super(metadataFetcher, executorPools, validator, driverUnsupportedSchemaCache); } @Override diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java index d6c81e456..058fcd119 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java @@ -40,6 +40,7 @@ import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.config.SSTableUploadConfiguration; import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.handlers.AbstractHandler; import org.apache.cassandra.sidecar.handlers.AccessProtected; import org.apache.cassandra.sidecar.handlers.data.SSTableUploadRequestParam; @@ -70,18 +71,20 @@ public class SSTableUploadHandler extends AbstractHandler validateKeyspaceAndTable(String host, logger.error(message); return Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message)); } - if (MetadataUtils.table(keyspaceMetadata, request.table()) == null) { - String message = String.format("Invalid table name '%s' supplied for keyspace '%s'", - request.table(), request.keyspace()); - logger.error(message); - return Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message)); + // check for tables that are not parseable by Java driver's metadata feature + boolean tableExists = driverUnsupportedSchemaCache.getTableSchema(request.keyspace(), request.table(), false) != null; + if (!tableExists) + { + String message = String.format("Invalid table name '%s' supplied for keyspace '%s'", + request.table(), request.keyspace()); + logger.error(message); + return Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message)); + } } return Future.succeededFuture(request); }); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java index 24a3ec9db..08cd65961 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java @@ -27,8 +27,10 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.handlers.AbstractHandler; import org.apache.cassandra.sidecar.routes.RoutingContextUtils; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; @@ -46,12 +48,16 @@ @Singleton public class ValidateTableExistenceHandler extends AbstractHandler { + private final DriverUnsupportedSchemaCache driverUnsupportedSchemaCache; + @Inject public ValidateTableExistenceHandler(InstanceMetadataFetcher metadataFetcher, ExecutorPools executorPools, - CassandraInputValidator validator) + CassandraInputValidator validator, + DriverUnsupportedSchemaCache driverUnsupportedSchemaCache) { super(metadataFetcher, executorPools, validator); + this.driverUnsupportedSchemaCache = driverUnsupportedSchemaCache; } // It is a validator, and it does not assume values (keyspace and table) are present. @@ -97,7 +103,14 @@ protected void handleInternal(RoutingContext context, } TableMetadata tableMetadata = keyspaceMetadata.getTable(table); - if (tableMetadata == null) + boolean tableExists = tableMetadata != null; + if (!tableExists) + { + tableExists = driverUnsupportedSchemaCache.getTableSchema(new Name(keyspaceMetadata.getName()), + new Name(table), + false) != null; + } + if (!tableExists) { String errMsg = "Table " + input.tableName() + " was not found for keyspace " + input.keyspace(); context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, errMsg)); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java index ddf094f9c..fec382a4b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java @@ -70,6 +70,7 @@ import org.apache.cassandra.sidecar.coordination.TokenRingProvider; import org.apache.cassandra.sidecar.db.CdcConfigAccessor; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; import org.apache.cassandra.sidecar.db.TokenSplitConfigAccessor; import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor; @@ -128,10 +129,11 @@ PeriodicTask cdcRawDirectorySpaceCleanercPeriodicTask(CdcRawDirectorySpaceCleane @Singleton CassandraClusterSchemaMonitor cassandraClusterSchemaMonitorInstance(InstanceMetadataFetcher instanceMetadataFetcher, CdcDatabaseAccessor databaseAccessor, + DriverUnsupportedSchemaCache driverUnsupportedSchemaCache, SidecarConfiguration configuration, CassandraBridgeFactory cassandraBridgeFactory) { - return new CassandraClusterSchemaMonitor(instanceMetadataFetcher, databaseAccessor, configuration, cassandraBridgeFactory); + return new CassandraClusterSchemaMonitor(instanceMetadataFetcher, databaseAccessor, driverUnsupportedSchemaCache, configuration, cassandraBridgeFactory); } @ProvidesIntoMap diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java index 785104b6a..9019b8de6 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/ConfigurationModule.java @@ -32,6 +32,7 @@ import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Singleton; +import com.google.inject.multibindings.ProvidesIntoMap; import com.google.inject.name.Named; import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.adapters.base.CassandraFactory; @@ -54,12 +55,17 @@ import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.db.schema.TableSchemaFetcher; import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory; import org.apache.cassandra.sidecar.metrics.instance.InstanceHealthMetrics; +import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey; +import org.apache.cassandra.sidecar.modules.multibindings.PeriodicTaskMapKeys; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; import static org.apache.cassandra.sidecar.common.server.utils.ByteUtils.bytesToHumanReadableBinaryPrefix; +import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP; /** @@ -126,6 +132,25 @@ CQLSessionProvider cqlSessionProvider(Vertx vertx, return cqlSessionProvider; } + @Provides + @Singleton + DriverUnsupportedSchemaCache driverUnsupportedSchemaCache(Vertx vertx, + SidecarConfiguration sidecarConfiguration, + CQLSessionProvider cqlSessionProvider) + { + DriverUnsupportedSchemaCache schemaCache = new DriverUnsupportedSchemaCache(sidecarConfiguration, cqlSessionProvider); + // trigger immediate cache population once Sidecar connects to Cassandra node + vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(), message -> schemaCache.refresh(true)); + return schemaCache; + } + + @ProvidesIntoMap + @KeyClassMapKey(PeriodicTaskMapKeys.UnsupportedSchemaCacheTaskKey.class) + PeriodicTask driverUnsupportedSchemaCachePeriodicTask(DriverUnsupportedSchemaCache schemaCache) + { + return schemaCache; + } + @Provides @Singleton CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver, DriverUtils driverUtils, TableSchemaFetcher tableSchemaFetcher) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java index 293a57a22..fcf7055fa 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java @@ -37,4 +37,5 @@ interface CdcRawDirectorySpaceCleanerTaskKey extends ClassKey {} interface CdcPublisherTaskKey extends ClassKey {} interface CdcConfigRefresherNotifierKey extends ClassKey {} interface CassandraClusterSchemaTaskKey extends ClassKey {} + interface UnsupportedSchemaCacheTaskKey extends ClassKey {} } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java index 4d9df6c5d..f39e7e895 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitor.java @@ -33,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.inject.Singleton; import io.vertx.core.Promise; import org.apache.cassandra.bridge.CassandraBridge; @@ -44,6 +43,7 @@ import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.utils.CdcUtil; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.data.CqlTable; @@ -69,6 +69,7 @@ public class CassandraClusterSchemaMonitor implements PeriodicTask private final AtomicReference> cdcTables = new AtomicReference<>(Collections.emptySet()); private final ConcurrentHashMap tableIdCache = new ConcurrentHashMap<>(); private final CdcDatabaseAccessor databaseAccessor; + private final DriverUnsupportedSchemaCache driverUnsupportedSchemaCache; private final CopyOnWriteArrayList schemaChangeListeners = new CopyOnWriteArrayList<>(); private final SidecarConfiguration sidecarConfiguration; private final InstanceMetadataFetcher instanceFetcher; @@ -76,12 +77,14 @@ public class CassandraClusterSchemaMonitor implements PeriodicTask public CassandraClusterSchemaMonitor(InstanceMetadataFetcher instanceFetcher, CdcDatabaseAccessor databaseAccessor, + DriverUnsupportedSchemaCache driverUnsupportedSchemaCache, SidecarConfiguration sidecarConfiguration, CassandraBridgeFactory cassandraBridgeFactory) { this.instanceFetcher = instanceFetcher; this.databaseAccessor = databaseAccessor; + this.driverUnsupportedSchemaCache = driverUnsupportedSchemaCache; this.sidecarConfiguration = sidecarConfiguration; this.cassandraBridgeFactory = cassandraBridgeFactory; } @@ -93,7 +96,7 @@ public void addSchemaChangeListener(Runnable listener) public void refresh() { - NodeSettings nodeSettings = instanceFetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings()); + NodeSettings nodeSettings = instanceFetcher.callOnFirstAvailableInstance(instance -> instance.delegate().nodeSettings()); CassandraBridge cassandraBridge = cassandraBridgeFactory.get(nodeSettings.releaseVersion()); CdcBridge cdcBridge = CdcBridgeFactory.getCdcBridge(cassandraBridge); @@ -101,6 +104,7 @@ public void refresh() { LOGGER.debug("Checking for schema changes..."); String fullSchemaText = databaseAccessor.fullSchema(); + fullSchemaText += driverUnsupportedSchemaCache.getFullSchema(); if (!fullSchemaText.equals(currSchemaText.get())) { LOGGER.info("Schema change detected, refreshing CDC tables"); @@ -177,10 +181,12 @@ public ScheduleDecision scheduleDecision() @VisibleForTesting static Set buildCdcTables(CdcDatabaseAccessor cdcDatabaseAccessor, + DriverUnsupportedSchemaCache driverUnsupportedSchemaCache, ConcurrentHashMap tableIdCache, @NotNull final CassandraBridge cassandraBridge) { - return buildCdcTables(cdcDatabaseAccessor.fullSchema(), + String fullSchema = cdcDatabaseAccessor.fullSchema() + driverUnsupportedSchemaCache.getFullSchema(); + return buildCdcTables(fullSchema, cdcDatabaseAccessor.partitioner(), tableIdCache, cdcDatabaseAccessor::getTableId, diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java index 7d985e2d4..88810fbb8 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java @@ -60,6 +60,7 @@ import org.apache.cassandra.sidecar.coordination.ClusterLeaseClaimTask; import org.apache.cassandra.sidecar.coordination.ElectorateMembership; import org.apache.cassandra.sidecar.db.SidecarLeaseDatabaseAccessor; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.modules.multibindings.ClassKey; @@ -70,6 +71,7 @@ import org.apache.cassandra.sidecar.tasks.ScheduleDecision; import org.jetbrains.annotations.NotNull; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP; /** @@ -227,7 +229,12 @@ public CQLSessionProvider cqlSessionProvider(Vertx vertx) @NotNull public Session get() { - return cassandraSidecarTestContext.session(); + Session session = cassandraSidecarTestContext.session(); + if (session == null) + { + throw new CassandraUnavailableException(CQL, "CQL session unavailable"); + } + return session; } @Override diff --git a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java index 87c33d606..0494bbe92 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java @@ -64,6 +64,7 @@ import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration; import org.apache.cassandra.sidecar.config.yaml.ThrottleConfigurationImpl; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.utils.CassandraVersionProvider; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; @@ -253,4 +254,14 @@ public DnsResolver dnsResolver() { return CassandraClientTokenRingProviderTest.mockDnsResolver(); } + + @Provides + @Singleton + public DriverUnsupportedSchemaCache driverUnsupportedSchemaCache() + { + DriverUnsupportedSchemaCache mock = mock(DriverUnsupportedSchemaCache.class); + when(mock.getFullSchema()).thenReturn(""); + when(mock.getKeyspaceSchema(any())).thenReturn(""); + return mock; + } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessorTest.java b/server/src/test/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessorTest.java new file mode 100644 index 000000000..e9c6ad626 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/db/CQLSchemaAccessorTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.data.Name; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CQLSchemaAccessorTest +{ + static final Name KEYSPACE = new Name("keyspace1"); + static final Name TABLE = new Name("table1"); + static final String SCHEMA = "CREATE TABLE keyspace1.table1 (a int, b int, PRIMARY KEY (a));"; + + CQLSchemaAccessor schemaAccessor; + + @BeforeEach + void setUp() + { + CQLSessionProvider sessionProvider = mockCQLSessionProvider(KEYSPACE.name(), TABLE.name(), SCHEMA); + schemaAccessor = new CQLSchemaAccessor(sessionProvider); + } + + @Test + void testGetKeyspaceNames() + { + assertThat(schemaAccessor.getKeyspaces()).containsExactly(KEYSPACE); + } + + @Test + void testGetExistingTable() + { + assertThat(schemaAccessor.getTableSchema(KEYSPACE, TABLE)).containsExactly(SCHEMA); + } + + @Test + void testGetNotExistingTable() + { + assertThat(schemaAccessor.getTableSchema(KEYSPACE, new Name("unknown"))).isNull(); + } + + static CQLSessionProvider mockCQLSessionProvider(String keyspace, String table, String schema) + { + Session session = mock(Session.class, RETURNS_DEEP_STUBS); + + when(session.execute(eq("DESCRIBE KEYSPACES"))).then(invocation -> { + ResultSet resultSet = mock(ResultSet.class); + Row row = mockRow(Map.of("keyspace_name", keyspace)); + when(resultSet.all()).thenReturn(List.of(row)); + return resultSet; + }); + + when(session.execute(startsWith("DESCRIBE TABLE"))).thenThrow(new InvalidQueryException("Unknown table")); + + String describeTable = String.format("DESCRIBE TABLE %s.%s", keyspace, table); + when(session.execute(eq(describeTable))).then(invocation -> { + ResultSet resultSet = mock(ResultSet.class); + Row row = mockRow(Map.of("create_statement", schema)); + when(resultSet.all()).thenReturn(List.of(row)); + return resultSet; + }); + + CQLSessionProvider cqlSession = mock(CQLSessionProvider.class); + when(cqlSession.get()).thenReturn(session); + + return cqlSession; + } + + static Row mockRow(Map values) + { + Row row = mock(Row.class); + values.forEach((k, v) -> when(row.getString(k)).thenReturn(v)); + return row; + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCacheTest.java b/server/src/test/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCacheTest.java new file mode 100644 index 000000000..0c9de1ebe --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCacheTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.data.Name; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.DriverConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; + +import static org.apache.cassandra.sidecar.db.CQLSchemaAccessorTest.mockRow; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class DriverUnsupportedSchemaCacheTest +{ + static final Name KEYSPACE = new Name("keyspace1"); + static final Name TABLE = new Name("table1"); + static final String SCHEMA = "CREATE TABLE keyspace1.table1 (a int, b vector, PRIMARY KEY (a));"; + + CQLSessionProvider sessionProvider; + Session session; + DriverUnsupportedSchemaCache schemaCache; + + @BeforeEach + void setUp() + { + sessionProvider = CQLSchemaAccessorTest.mockCQLSessionProvider(KEYSPACE.name(), TABLE.name(), SCHEMA); + session = sessionProvider.get(); + mockPreparedQuery(session, + "SELECT keyspace_name, table_name FROM system_schema.tables", + List.of(Map.of("keyspace_name", KEYSPACE.name(), "table_name", TABLE.name()))); + SidecarConfiguration sidecarConfiguration = mockSidecarConfiguration(); + schemaCache = new DriverUnsupportedSchemaCache(sidecarConfiguration, sessionProvider); + } + + @Test + void testImmediateSchemaLookup() + { + assertThat(schemaCache.getFullSchema()).isEqualTo(SCHEMA); + assertThat(schemaCache.getKeyspaceSchema(KEYSPACE)).isEqualTo(SCHEMA); + assertThat(schemaCache.getTableSchema(KEYSPACE, TABLE)).isEqualTo(SCHEMA); + + assertThat(schemaCache.getTableSchema(new Name("unknown"), TABLE)).isNull(); + assertThat(schemaCache.getTableSchema(KEYSPACE, new Name("unknown"))).isNull(); + } + + @Test + void testSchemaLookupAfterRefresh() + { + // mark cache as initialized, but effectively empty + schemaCache.setInitialized(true); + + assertThat(schemaCache.getFullSchema()).isEqualTo(""); + assertThat(schemaCache.getKeyspaceSchema(KEYSPACE)).isEqualTo(""); + + Promise p = Promise.promise(); + schemaCache.execute(p); + Future future = p.future(); + assertThat(future.succeeded()).isTrue(); + + // simulate Cassandra unavailability to verify data is taken from cache + when(sessionProvider.get()).thenThrow(new CassandraUnavailableException(CQL, "CQL unavailable")); + + assertThat(schemaCache.getFullSchema()).isEqualTo(SCHEMA); + assertThat(schemaCache.getKeyspaceSchema(KEYSPACE)).isEqualTo(SCHEMA); + assertThat(schemaCache.getTableSchema(KEYSPACE, TABLE)).isEqualTo(SCHEMA); + } + + static SidecarConfiguration mockSidecarConfiguration() + { + SidecarConfiguration sidecarConfiguration = mock(SidecarConfiguration.class); + DriverConfiguration driverConfiguration = mock(DriverConfiguration.class); + when(driverConfiguration.unsupportedTableSchemaRefreshTime()).thenReturn(new SecondBoundConfiguration(5, TimeUnit.SECONDS)); + when(sidecarConfiguration.driverConfiguration()).thenReturn(driverConfiguration); + return sidecarConfiguration; + } + + static void mockPreparedQuery(Session session, String statement, List> rows) + { + PreparedStatement preparedStatement = mock(PreparedStatement.class); + BoundStatement boundStatement = mock(BoundStatement.class); + when(session.prepare(eq(statement))).thenReturn(preparedStatement); + when(preparedStatement.bind()).thenReturn(boundStatement); + when(session.execute(eq(boundStatement))).then(invocation -> { + ResultSet resultSet = mock(ResultSet.class); + List mockRows = new ArrayList<>(); + rows.forEach(row -> mockRows.add(mockRow(row))); + when(resultSet.all()).thenReturn(mockRows); + return resultSet; + }); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java index fbda6c963..e6435c6dc 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/SchemaHandlerTest.java @@ -53,12 +53,14 @@ import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.modules.SidecarModules; import org.apache.cassandra.sidecar.server.Server; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -186,5 +188,15 @@ public InstancesMetadata instanceConfig() throws IOException return mockInstancesMetadata; } + + @Provides + @Singleton + public DriverUnsupportedSchemaCache driverUnsupportedSchemaCache() + { + DriverUnsupportedSchemaCache schemaAccessor = mock(DriverUnsupportedSchemaCache.class); + when(schemaAccessor.getFullSchema()).thenReturn(""); + when(schemaAccessor.getKeyspaceSchema(any())).thenReturn(""); + return schemaAccessor; + } } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java index 498a9e623..13d58f952 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CassandraClusterSchemaMonitorTest.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.Test; import io.vertx.core.Promise; - import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CdcBridge; @@ -42,6 +41,7 @@ import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.db.DriverUnsupportedSchemaCache; import org.apache.cassandra.sidecar.utils.CdcUtil; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.data.CqlTable; @@ -49,7 +49,6 @@ import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.cassandra.spark.utils.TableIdentifier; - import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -73,6 +72,7 @@ class CassandraClusterSchemaMonitorTest private CassandraClusterSchemaMonitor clusterSchema; private InstanceMetadataFetcher mockInstanceFetcher; private CdcDatabaseAccessor mockDatabaseAccessor; + private DriverUnsupportedSchemaCache mockDriverUnsupportedSchemaCache; private SidecarConfiguration mockSidecarConfiguration; private ServiceConfiguration mockServiceConfiguration; private CdcConfiguration mockCdcConfiguration; @@ -102,6 +102,7 @@ void setup() { mockInstanceFetcher = mock(InstanceMetadataFetcher.class); mockDatabaseAccessor = mock(CdcDatabaseAccessor.class); + mockDriverUnsupportedSchemaCache = mock(DriverUnsupportedSchemaCache.class); mockSidecarConfiguration = mock(SidecarConfiguration.class); mockServiceConfiguration = mock(ServiceConfiguration.class); mockCdcConfiguration = mock(CdcConfiguration.class); @@ -129,6 +130,7 @@ void setup() // Setup database accessor when(mockDatabaseAccessor.fullSchema()).thenReturn(INITIAL_SCHEMA); + when(mockDriverUnsupportedSchemaCache.getFullSchema()).thenReturn(""); when(mockDatabaseAccessor.partitioner()).thenReturn(Partitioner.Murmur3Partitioner); when(mockDatabaseAccessor.getTableId(any(TableIdentifier.class))).thenReturn(UUID.randomUUID()); when(mockCassandraBridgeFactory.get(anyString())).thenReturn(mockCassandraBridge); @@ -136,6 +138,7 @@ void setup() clusterSchema = new CassandraClusterSchemaMonitor( mockInstanceFetcher, mockDatabaseAccessor, + mockDriverUnsupportedSchemaCache, mockSidecarConfiguration, mockCassandraBridgeFactory ); @@ -223,6 +226,7 @@ void testRefreshDetectsSchemaChangeAndUpdatesCdcTables() // Verify initial schema processing verify(mockDatabaseAccessor, times(1)).fullSchema(); + verify(mockDriverUnsupportedSchemaCache, times(1)).getFullSchema(); verify(mockCdcBridge, times(1)).updateCdcSchema(any(Set.class), eq(Partitioner.Murmur3Partitioner), any()); // Second refresh - should detect schema change and update @@ -230,6 +234,7 @@ void testRefreshDetectsSchemaChangeAndUpdatesCdcTables() // Verify schema change detection and update verify(mockDatabaseAccessor, times(2)).fullSchema(); + verify(mockDriverUnsupportedSchemaCache, times(2)).getFullSchema(); verify(mockCdcBridge, times(2)).updateCdcSchema(any(Set.class), eq(Partitioner.Murmur3Partitioner), any()); } } @@ -420,6 +425,7 @@ void testBuildCdcTablesWithDatabaseAccessor() Set result = CassandraClusterSchemaMonitor.buildCdcTables( mockDatabaseAccessor, + mockDriverUnsupportedSchemaCache, tableIdCache, mockCassandraBridge ); diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index a6aca58dd..5a2e0aa2e 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -141,4 +141,8 @@ + + + +