diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index edeef1147bd58..1a9b6cd89a582 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -193,8 +193,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMapSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMapSerializer; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMapSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessageSerializer; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; @@ -555,8 +553,6 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(508, GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer()); factory.register(510, IgniteDhtPartitionHistorySuppliersMap::new, new IgniteDhtPartitionHistorySuppliersMapSerializer()); - factory.register(513, IgniteDhtPartitionsToReloadMap::new, - new IgniteDhtPartitionsToReloadMapSerializer()); factory.register(517, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); factory.register(518, GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer()); factory.register(519, GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 55ae24bbaa3a7..c06eeaad3935d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -88,7 +88,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask; @@ -1350,7 +1349,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, - @Nullable IgniteDhtPartitionsToReloadMap partsToReload + @Nullable Map>> partsToReload ) { Collection grps = cctx.cache().cacheGroups(); @@ -1371,7 +1370,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, - @Nullable IgniteDhtPartitionsToReloadMap partsToReload, + @Nullable Map>> partsToReload, Collection grps ) { AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e19ba2378e068..f26af16843a98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -340,7 +340,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ @GridToStringExclude - private final IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); + private final Map>> partsToReload = new ConcurrentHashMap<>(); /** */ private final AtomicBoolean done = new AtomicBoolean(); @@ -3425,8 +3425,13 @@ private void resetOwnersByCounter(GridDhtPartitionTopology top, UUID nodeId = e.getKey(); Set parts = e.getValue(); - for (int part : parts) - partsToReload.put(nodeId, top.groupId(), part); + for (int part : parts) { + Map> nodeMap = partsToReload.computeIfAbsent(nodeId, k -> new ConcurrentHashMap<>()); + + Set partsToReload = nodeMap.computeIfAbsent(top.groupId(), k -> new HashSet<>()); + + partsToReload.add(part); + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 9ee1835e1a913..2509c0fe89d45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; @@ -83,7 +84,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Order(4) @Compress @GridToStringInclude - IgniteDhtPartitionsToReloadMap partsToReload; + Map>> partsToReload; /** Partition sizes. */ @Order(5) @@ -145,7 +146,7 @@ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, @NotNull AffinityTopologyVersion topVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, - @Nullable IgniteDhtPartitionsToReloadMap partsToReload) { + @Nullable Map>> partsToReload) { super(id, lastVer); assert id == null || topVer.equals(id.topologyVersion()); @@ -336,7 +337,9 @@ public Set partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); - return partsToReload.get(nodeId, grpId); + Map> nodeMap = partsToReload.get(nodeId); + + return nodeMap == null ? Collections.emptySet() : (Set)F.emptyIfNull(nodeMap.get(grpId)); } /** @@ -448,7 +451,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); if (partsToReload == null) - partsToReload = new IgniteDhtPartitionsToReloadMap(); + partsToReload = new ConcurrentHashMap<>(); if (errs == null) errs = new HashMap<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java deleted file mode 100644 index d8cc8d9674c84..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; - -/** - * Partition reload map. - */ -public class IgniteDhtPartitionsToReloadMap implements Message { - /** */ - @Order(0) - Map>> map; - - /** - * @param nodeId Node ID. - * @param cacheId Cache ID. - * @return Set of partitions to reload. - */ - public synchronized Set get(UUID nodeId, int cacheId) { - if (map == null) - return Collections.emptySet(); - - Map> nodeMap = map.get(nodeId); - - return nodeMap == null ? Collections.emptySet() : (Set)F.emptyIfNull(nodeMap.get(cacheId)); - } - - /** - * @param nodeId Node ID. - * @param cacheId Cache ID. - * @param partId Partition ID. - */ - public synchronized void put(UUID nodeId, int cacheId, int partId) { - if (map == null) - map = new HashMap<>(); - - Map> nodeMap = map.computeIfAbsent(nodeId, k -> new HashMap<>()); - - Set parts = nodeMap.computeIfAbsent(cacheId, k -> new HashSet<>()); - - parts.add(partId); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteDhtPartitionsToReloadMap.class, this); - } - -} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index f16695c2d12b8..552cd942ffb5a 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1194,7 +1194,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java index fb96d7da32ed4..41d7b37e763b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.direct.state.DirectMessageState; @@ -29,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -108,13 +108,17 @@ public void testWriteReadHugeMessage() { /** */ private GridDhtPartitionsFullMessage fullMessage() { IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); - IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); + Map>> partsToReload = new ConcurrentHashMap<>(); for (int i = 0; i < 500; i++) { UUID uuid = UUID.randomUUID(); partHistSuppliers.put(uuid, i, i + 1, i + 2); - partsToReload.put(uuid, i, i + 1); + + Map> nodeMap = new ConcurrentHashMap<>(); + nodeMap.put(i, Set.of(i + 1)); + + partsToReload.put(uuid, nodeMap); } return new GridDhtPartitionsFullMessage(null, null, new AffinityTopologyVersion(0), partHistSuppliers, partsToReload); @@ -130,8 +134,8 @@ private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, GridDhtP for (Map.Entry> entry : expHistSuppliers.entrySet()) assertEquals(entry.getValue(), actHistSuppliers.get(entry.getKey())); - Map>> expPartsToReload = U.field((Object)U.field(expected, "partsToReload"), "map"); - Map>> actPartsToReload = U.field((Object)U.field(actual, "partsToReload"), "map"); + Map>> expPartsToReload = U.field(expected, "partsToReload"); + Map>> actPartsToReload = U.field(actual, "partsToReload"); assertEquals(expPartsToReload.size(), actPartsToReload.size());