Skip to content

Commit c484fc5

Browse files
minal-kyadabeobal
authored andcommitted
Add tooling to repair system peers tables if inconsistent with cluster metadata
Patch by Minal Kyada; reviewed by Alex Petrov and Sam Tunnicliffe for CASSANDRA-21187
1 parent 7306343 commit c484fc5

6 files changed

Lines changed: 528 additions & 0 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
6.0-alpha2
2+
* Add tooling to repair system peers and peers_v2 if inconsistent with cluster metadata (CASSANDRA-21187)
23
* Fix a removed TTLed row re-appearance in a materialized view after a cursor compaction (CASSANDRA-21152)
34
* Rework ZSTD dictionary compression logic to create a trainer per training (CASSANDRA-21209)
45
Merged from 5.0:

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ public enum CassandraRelevantProperties
587587
SUN_STDERR_ENCODING("sun.stderr.encoding"),
588588
SUN_STDOUT_ENCODING("sun.stdout.encoding"),
589589
SUPERUSER_SETUP_DELAY_MS("cassandra.superuser_setup_delay_ms", "10000"),
590+
SYNC_SYSTEM_PEERS_TABLES_AT_STARTUP("cassandra.sync_system_peers_tables_at_startup", "true"),
590591
SYSTEM_AUTH_DEFAULT_RF("cassandra.system_auth.default_rf", "1"),
591592
SYSTEM_DISTRIBUTED_DEFAULT_RF("cassandra.system_distributed.default_rf", "3"),
592593
SYSTEM_TRACES_DEFAULT_RF("cassandra.system_traces.default_rf", "2"),
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db;
20+
21+
import java.net.InetAddress;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
import java.util.Set;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import org.apache.cassandra.cql3.UntypedResultSet;
32+
import org.apache.cassandra.db.marshal.UTF8Type;
33+
import org.apache.cassandra.db.virtual.PeersTable;
34+
import org.apache.cassandra.locator.InetAddressAndPort;
35+
import org.apache.cassandra.schema.SchemaConstants;
36+
import org.apache.cassandra.tcm.ClusterMetadata;
37+
import org.apache.cassandra.tcm.membership.Location;
38+
import org.apache.cassandra.tcm.membership.NodeAddresses;
39+
import org.apache.cassandra.tcm.membership.NodeId;
40+
import org.apache.cassandra.utils.FBUtilities;
41+
42+
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
43+
import static org.apache.cassandra.db.SystemKeyspace.LEGACY_PEERS;
44+
import static org.apache.cassandra.db.SystemKeyspace.PEERS_V2;
45+
46+
/**
47+
* Validator to ensure system.peers and system.peers_v2 tables match ClusterMetadata.
48+
* These tables are maintained for existing clients and tools which read from them to determine
49+
* topology and schema information, while Cassandra itself uses ClusterMetadata as the source of truth.
50+
*
51+
* This validator detects inconsistencies and automatically repairs them by synchronizing
52+
* the peers tables with the current ClusterMetadata.
53+
*
54+
* The system_views.peers virtual table provides a live view on the current ClusterMetadata
55+
* and includes all members of the cluster, unlike the legacy tables which exclude the local node.
56+
*/
57+
public final class SystemPeersValidator
58+
{
59+
private static final Logger logger = LoggerFactory.getLogger(SystemPeersValidator.class);
60+
private static final String SELECT_PEERS_V2_QUERY = String.format("SELECT * FROM %s.%s",
61+
SchemaConstants.SYSTEM_KEYSPACE_NAME, PEERS_V2);
62+
private static final String SELECT_PEERS_QUERY = String.format("SELECT * FROM %s.%s",
63+
SchemaConstants.SYSTEM_KEYSPACE_NAME, LEGACY_PEERS);
64+
private static final String DELETE_PEERS_V2_QUERY = String.format("DELETE FROM %s.%s WHERE peer = ? AND peer_port = ?",
65+
SchemaConstants.SYSTEM_KEYSPACE_NAME, PEERS_V2);
66+
private static final String DELETE_PEERS_QUERY = String.format("DELETE FROM %s.%s WHERE peer = ?",
67+
SchemaConstants.SYSTEM_KEYSPACE_NAME, LEGACY_PEERS);
68+
69+
public static void validateAndRepair(ClusterMetadata metadata)
70+
{
71+
Map<InetAddressAndPort, UntypedResultSet.Row> peersV2Rows = getPeersV2Rows();
72+
Map<InetAddress, UntypedResultSet.Row> peersRows = getPeersRows();
73+
74+
Map<InetAddressAndPort, NodeId> knownEndpoints = new HashMap<>();
75+
Set<InetAddress> knownAddresses = new HashSet<>();
76+
InetAddressAndPort localEndpoint = FBUtilities.getBroadcastAddressAndPort();
77+
for (InetAddressAndPort endpoint : metadata.directory.allJoinedEndpoints())
78+
{
79+
if (endpoint.equals(localEndpoint))
80+
continue;
81+
knownEndpoints.put(endpoint, metadata.directory.peerId(endpoint));
82+
knownAddresses.add(endpoint.getAddress());
83+
}
84+
85+
for (InetAddressAndPort endpoint : peersV2Rows.keySet())
86+
{
87+
if (!knownEndpoints.containsKey(endpoint))
88+
{
89+
logger.info("Removing stale peer {} from system.{}", endpoint, PEERS_V2);
90+
executeInternal(DELETE_PEERS_V2_QUERY, endpoint.getAddress(), endpoint.getPort());
91+
}
92+
}
93+
94+
for (InetAddress address : peersRows.keySet())
95+
{
96+
if (!knownAddresses.contains(address))
97+
{
98+
logger.info("Removing stale peer {} from system.{}", address, LEGACY_PEERS);
99+
executeInternal(DELETE_PEERS_QUERY, address);
100+
}
101+
}
102+
103+
for (Map.Entry<InetAddressAndPort, NodeId> entry : knownEndpoints.entrySet())
104+
{
105+
NodeId nodeId = entry.getValue();
106+
InetAddressAndPort endpoint = entry.getKey();
107+
UntypedResultSet.Row peersV2Row = peersV2Rows.get(endpoint);
108+
UntypedResultSet.Row peersRow = peersRows.get(endpoint.getAddress());
109+
110+
boolean peersV2NeedsRepair = needsRepair(peersV2Row, endpoint, PEERS_V2, nodeId, metadata);
111+
boolean peersNeedsRepair = needsRepair(peersRow, endpoint, LEGACY_PEERS, nodeId, metadata);
112+
113+
if (peersV2NeedsRepair || peersNeedsRepair)
114+
PeersTable.updateLegacyPeerTable(nodeId, metadata, metadata);
115+
}
116+
}
117+
118+
private static boolean needsRepair(UntypedResultSet.Row row, InetAddressAndPort endpoint,
119+
String table, NodeId nodeId, ClusterMetadata metadata)
120+
{
121+
if (row == null)
122+
{
123+
logger.info("Adding missing peer {} to system.{}", endpoint, table);
124+
return true;
125+
}
126+
boolean isEquivalent = PEERS_V2.equals(table)
127+
? peersV2RowIsEquivalent(row, nodeId, metadata)
128+
: peersRowIsEquivalent(row, nodeId, metadata);
129+
if (!isEquivalent)
130+
{
131+
logger.info("Repairing mismatched peer {} in system.{}: {}", endpoint, table, row);
132+
return true;
133+
}
134+
return false;
135+
}
136+
137+
private static boolean peersV2RowIsEquivalent(UntypedResultSet.Row row, NodeId nodeId, ClusterMetadata metadata)
138+
{
139+
NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
140+
return commonColumnsAreEquivalent(row, nodeId, addresses, metadata)
141+
&& row.has("preferred_port") && row.getInt("preferred_port") == addresses.broadcastAddress.getPort()
142+
&& row.has("native_port") && row.getInt("native_port") == addresses.nativeAddress.getPort();
143+
}
144+
145+
private static boolean peersRowIsEquivalent(UntypedResultSet.Row row, NodeId nodeId, ClusterMetadata metadata)
146+
{
147+
NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
148+
return commonColumnsAreEquivalent(row, nodeId, addresses, metadata);
149+
}
150+
151+
private static boolean commonColumnsAreEquivalent(UntypedResultSet.Row row, NodeId nodeId,
152+
NodeAddresses addresses, ClusterMetadata metadata)
153+
{
154+
Location location = metadata.directory.location(nodeId);
155+
// This column is differently named in the peers and peers_v2 tables
156+
String nativeAddressColumn = row.has("native_address") ? "native_address" : "rpc_address";
157+
158+
// Check existence first because row.getXXX can NPE if the column is not present
159+
return row.has("preferred_ip") && Objects.equals(row.getInetAddress("preferred_ip"), addresses.broadcastAddress.getAddress())
160+
&& row.has(nativeAddressColumn) && Objects.equals(row.getInetAddress(nativeAddressColumn), addresses.nativeAddress.getAddress())
161+
&& row.has("data_center") && Objects.equals(row.getString("data_center"), location.datacenter)
162+
&& row.has("rack") && Objects.equals(row.getString("rack"), location.rack)
163+
&& row.has("host_id") && Objects.equals(row.getUUID("host_id"), nodeId.toUUID())
164+
&& row.has("release_version") && Objects.equals(row.getString("release_version"), metadata.directory.version(nodeId).cassandraVersion.toString())
165+
&& row.has("schema_version") && Objects.equals(row.getUUID("schema_version"), metadata.schema.getVersion())
166+
&& row.has("tokens") && Objects.equals(row.getSet("tokens", UTF8Type.instance), SystemKeyspace.tokensAsSet(metadata.tokenMap.tokens(nodeId)));
167+
}
168+
169+
private static Map<InetAddressAndPort, UntypedResultSet.Row> getPeersV2Rows()
170+
{
171+
Map<InetAddressAndPort, UntypedResultSet.Row> rows = new HashMap<>();
172+
for (UntypedResultSet.Row row : executeInternal(SELECT_PEERS_V2_QUERY))
173+
rows.put(InetAddressAndPort.getByAddressOverrideDefaults(row.getInetAddress("peer"),
174+
row.getInt("peer_port")), row);
175+
return rows;
176+
}
177+
178+
private static Map<InetAddress, UntypedResultSet.Row> getPeersRows()
179+
{
180+
Map<InetAddress, UntypedResultSet.Row> rows = new HashMap<>();
181+
for (UntypedResultSet.Row row : executeInternal(SELECT_PEERS_QUERY))
182+
rows.put(row.getInetAddress("peer"), row);
183+
return rows;
184+
}
185+
}

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import org.apache.cassandra.db.Keyspace;
104104
import org.apache.cassandra.db.SizeEstimatesRecorder;
105105
import org.apache.cassandra.db.SystemKeyspace;
106+
import org.apache.cassandra.db.SystemPeersValidator;
106107
import org.apache.cassandra.db.commitlog.CommitLog;
107108
import org.apache.cassandra.db.compaction.CompactionManager;
108109
import org.apache.cassandra.db.compaction.OperationType;
@@ -859,6 +860,9 @@ public void runMayThrow() throws InterruptedException, ExecutionException, IOExc
859860
RegistrationStatus.instance.onRegistration();
860861
Startup.maybeExecuteStartupTransformation(self);
861862

863+
if (CassandraRelevantProperties.SYNC_SYSTEM_PEERS_TABLES_AT_STARTUP.getBoolean())
864+
SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
865+
862866
try
863867
{
864868
if (joinRing)
@@ -5802,6 +5806,12 @@ public List<String> getTablesForKeyspace(String keyspace)
58025806
return Keyspace.open(keyspace).getColumnFamilyStores().stream().map(cfs -> cfs.name).collect(Collectors.toList());
58035807
}
58045808

5809+
@Override
5810+
public void validateAndRepairPeersMetadata()
5811+
{
5812+
SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
5813+
}
5814+
58055815
@Override
58065816
public List<String> mutateSSTableRepairedState(boolean repaired, boolean preview, String keyspace, List<String> tableNames)
58075817
{

src/java/org/apache/cassandra/service/StorageServiceMBean.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,15 @@ public void enableAuditLog(String loggerName, String includedKeyspaces, String e
14101410
/** Gets the names of all tables for the given keyspace */
14111411
public List<String> getTablesForKeyspace(String keyspace);
14121412

1413+
/**
1414+
* Validates that system.peers and system.peers_v2 are consistent with ClusterMetadata,
1415+
* inserting missing peer entries and removing stale ones. This runs automatically on
1416+
* startup but can be triggered manually if a discrepancy is suspected.
1417+
* <p>
1418+
* Note: mutates data in system.peers and system.peers_v2.
1419+
*/
1420+
public void validateAndRepairPeersMetadata();
1421+
14131422
/** Mutates the repaired state of all SSTables for the given SSTables */
14141423
public List<String> mutateSSTableRepairedState(boolean repaired, boolean preview, String keyspace, List<String> tables);
14151424

0 commit comments

Comments
 (0)