Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
Expand Down Expand Up @@ -553,8 +551,6 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
factory.register(507, IgniteDhtPartitionCountersMap::new,
new IgniteDhtPartitionCountersMapSerializer());
factory.register(508, GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer());
factory.register(510, IgniteDhtPartitionHistorySuppliersMap::new,
new IgniteDhtPartitionHistorySuppliersMapSerializer());
factory.register(513, IgniteDhtPartitionsToReloadMap::new,
new IgniteDhtPartitionsToReloadMapSerializer());
factory.register(517, GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
Expand Down Expand Up @@ -1349,7 +1349,7 @@ private void sendAllPartitions(
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload
) {
Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
Expand All @@ -1370,7 +1370,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload,
Collection<CacheGroupContext> grps
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte

/** */
@GridToStringExclude
private final IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
private Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new ConcurrentHashMap<>();

/** Set of nodes that cannot be used for wal rebalancing due to some reason. */
private final Set<UUID> exclusionsFromHistoricalRebalance = ConcurrentHashMap.newKeySet();
Expand Down Expand Up @@ -587,7 +587,20 @@ public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) {
* @return List of IDs of history supplier nodes or empty list if these doesn't exist.
*/
public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince);
List<UUID> histSuppliers;

if (partHistSuppliers == null)
histSuppliers = Collections.emptyList();
else {
histSuppliers = new ArrayList<>();

for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> entry : partHistSuppliers.entrySet()) {
Long historyCounter = entry.getValue().get(new GroupPartitionIdPair(grpId, partId));

if (historyCounter != null && historyCounter <= cntrSince)
histSuppliers.add(entry.getKey());
}
}

histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains);

Expand Down Expand Up @@ -2436,7 +2449,7 @@ private String exchangeTimingsLogMessage(String header, List<String> timings) {
// Create and destroy caches and cache proxies.
cctx.cache().onExchangeDone(this, err);

Map<GroupPartitionIdPair, Long> locReserved = partHistSuppliers.getReservations(cctx.localNodeId());
Map<GroupPartitionIdPair, Long> locReserved = partHistSuppliers.get(cctx.localNodeId());

if (locReserved != null) {
boolean success = cctx.database().reserveHistoryForPreloading(locReserved);
Expand Down Expand Up @@ -3537,7 +3550,9 @@ private void findCounterForReservation(
break;

if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved < ownerSize) {
partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved);
Map<GroupPartitionIdPair, Long> nodeMap = partHistSuppliers.computeIfAbsent(ownerId, k -> new ConcurrentHashMap<>());

nodeMap.put(new GroupPartitionIdPair(grpId, p), ceilingMinReserved);

haveHistory.add(p);

Expand Down Expand Up @@ -4628,8 +4643,8 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa

assert partHistSuppliers.isEmpty();

partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ? msg.partitionHistorySuppliers() :
IgniteDhtPartitionHistorySuppliersMap.empty());
if (msg.partitionHistorySuppliers() != null)
partHistSuppliers = msg.partitionHistorySuppliers();

// Reserve at least 2 threads for system operations.
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Order(3)
@Compress
@GridToStringInclude
IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers;

/** Partitions that must be cleared and re-loaded. */
@Order(4)
Expand Down Expand Up @@ -144,7 +145,7 @@ public GridDhtPartitionsFullMessage() {
public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
@Nullable GridCacheVersion lastVer,
@NotNull AffinityTopologyVersion topVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload) {
super(id, lastVer);

Expand Down Expand Up @@ -325,7 +326,7 @@ public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) {
/**
* @return Partitions history suppliers.
*/
public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
public Map<UUID, Map<GroupPartitionIdPair, Long>> partitionHistorySuppliers() {
return partHistSuppliers;
}

Expand Down Expand Up @@ -445,7 +446,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
partCntrs = new IgniteDhtPartitionCountersMap();

if (partHistSuppliers == null)
partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
partHistSuppliers = new ConcurrentHashMap<>();

if (partsToReload == null)
partsToReload = new IgniteDhtPartitionsToReloadMap();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
Expand Down Expand Up @@ -107,13 +107,13 @@ public void testWriteReadHugeMessage() {

/** */
private GridDhtPartitionsFullMessage fullMessage() {
IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new ConcurrentHashMap<>();
IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap();

for (int i = 0; i < 500; i++) {
UUID uuid = UUID.randomUUID();

partHistSuppliers.put(uuid, i, i + 1, i + 2);
partHistSuppliers.put(uuid, Map.of(new GroupPartitionIdPair(i, i + 1), i + 2L));
partsToReload.put(uuid, i, i + 1);
}

Expand All @@ -122,8 +122,8 @@ private GridDhtPartitionsFullMessage fullMessage() {

/** */
private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, GridDhtPartitionsFullMessage actual) {
Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers = U.field(expected.partitionHistorySuppliers(), "map");
Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers = U.field(actual.partitionHistorySuppliers(), "map");
Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers = expected.partitionHistorySuppliers();
Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers = actual.partitionHistorySuppliers();

assertEquals(expHistSuppliers.size(), actHistSuppliers.size());

Expand Down
Loading