Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMapSerializer;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessageSerializer;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
Expand Down Expand Up @@ -555,8 +553,6 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
factory.register(508, GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer());
factory.register(510, IgniteDhtPartitionHistorySuppliersMap::new,
new IgniteDhtPartitionHistorySuppliersMapSerializer());
factory.register(513, IgniteDhtPartitionsToReloadMap::new,
new IgniteDhtPartitionsToReloadMapSerializer());
factory.register(517, GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
factory.register(518, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
factory.register(519, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
Expand Down Expand Up @@ -1350,7 +1349,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload
) {
Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();

Expand All @@ -1371,7 +1370,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload,
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload,
Collection<CacheGroupContext> grps
) {
AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte

/** */
@GridToStringExclude
private final IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap();
private final Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new ConcurrentHashMap<>();

/** */
private final AtomicBoolean done = new AtomicBoolean();
Expand Down Expand Up @@ -3425,8 +3425,13 @@ private void resetOwnersByCounter(GridDhtPartitionTopology top,
UUID nodeId = e.getKey();
Set<Integer> parts = e.getValue();

for (int part : parts)
partsToReload.put(nodeId, top.groupId(), part);
for (int part : parts) {
Map<Integer, Set<Integer>> nodeMap = partsToReload.computeIfAbsent(nodeId, k -> new ConcurrentHashMap<>());

Set<Integer> partsToReload = nodeMap.computeIfAbsent(top.groupId(), k -> new HashSet<>());

partsToReload.add(part);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
Expand Down Expand Up @@ -83,7 +84,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Order(4)
@Compress
@GridToStringInclude
IgniteDhtPartitionsToReloadMap partsToReload;
Map<UUID, Map<Integer, Set<Integer>>> partsToReload;

/** Partition sizes. */
@Order(5)
Expand Down Expand Up @@ -145,7 +146,7 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
@Nullable GridCacheVersion lastVer,
@NotNull AffinityTopologyVersion topVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload) {
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload) {
super(id, lastVer);

assert id == null || topVer.equals(id.topologyVersion());
Expand Down Expand Up @@ -336,7 +337,9 @@ public Set<Integer> partsToReload(UUID nodeId, int grpId) {
if (partsToReload == null)
return Collections.emptySet();

return partsToReload.get(nodeId, grpId);
Map<Integer, Set<Integer>> nodeMap = partsToReload.get(nodeId);

return nodeMap == null ? Collections.emptySet() : (Set<Integer>)F.emptyIfNull(nodeMap.get(grpId));
}

/**
Expand Down Expand Up @@ -448,7 +451,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) {
partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();

if (partsToReload == null)
partsToReload = new IgniteDhtPartitionsToReloadMap();
partsToReload = new ConcurrentHashMap<>();

if (errs == null)
errs = new HashMap<>();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.direct.state.DirectMessageState;
Expand All @@ -29,7 +30,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
Expand Down Expand Up @@ -108,13 +108,17 @@ public void testWriteReadHugeMessage() {
/** */
private GridDhtPartitionsFullMessage fullMessage() {
IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap();
Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new ConcurrentHashMap<>();

for (int i = 0; i < 500; i++) {
UUID uuid = UUID.randomUUID();

partHistSuppliers.put(uuid, i, i + 1, i + 2);
partsToReload.put(uuid, i, i + 1);

Map<Integer, Set<Integer>> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(i, Set.of(i + 1));

partsToReload.put(uuid, nodeMap);
}

return new GridDhtPartitionsFullMessage(null, null, new AffinityTopologyVersion(0), partHistSuppliers, partsToReload);
Expand All @@ -130,8 +134,8 @@ private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, GridDhtP
for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> entry : expHistSuppliers.entrySet())
assertEquals(entry.getValue(), actHistSuppliers.get(entry.getKey()));

Map<UUID, Map<Integer, Set<Integer>>> expPartsToReload = U.field((Object)U.field(expected, "partsToReload"), "map");
Map<UUID, Map<Integer, Set<Integer>>> actPartsToReload = U.field((Object)U.field(actual, "partsToReload"), "map");
Map<UUID, Map<Integer, Set<Integer>>> expPartsToReload = U.field(expected, "partsToReload");
Map<UUID, Map<Integer, Set<Integer>>> actPartsToReload = U.field(actual, "partsToReload");

assertEquals(expPartsToReload.size(), actPartsToReload.size());

Expand Down
Loading