diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index edeef1147bd58..bff61910aa4c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -191,8 +191,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMapSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMapSerializer; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMapSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMapSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; @@ -553,8 +551,6 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(507, IgniteDhtPartitionCountersMap::new, new IgniteDhtPartitionCountersMapSerializer()); factory.register(508, GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer()); - factory.register(510, IgniteDhtPartitionHistorySuppliersMap::new, - new IgniteDhtPartitionHistorySuppliersMapSerializer()); factory.register(513, IgniteDhtPartitionsToReloadMap::new, new IgniteDhtPartitionsToReloadMapSerializer()); factory.register(517, GridPartitionStateMap::new, new GridPartitionStateMapSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 55ae24bbaa3a7..ac778ae1a7e26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -87,7 +87,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask; @@ -1349,7 +1349,7 @@ private void sendAllPartitions( public GridDhtPartitionsFullMessage createPartitionsFullMessage( @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, - @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable Map> partHistSuppliers, @Nullable IgniteDhtPartitionsToReloadMap partsToReload ) { Collection grps = cctx.cache().cacheGroups(); @@ -1370,7 +1370,7 @@ public GridDhtPartitionsFullMessage createPartitionsFullMessage( public GridDhtPartitionsFullMessage createPartitionsFullMessage( @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, - @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable Map> partHistSuppliers, @Nullable IgniteDhtPartitionsToReloadMap partsToReload, Collection grps ) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e19ba2378e068..e04ee0f793f17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -324,7 +324,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ @GridToStringExclude - private final IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); + private Map> partHistSuppliers = new ConcurrentHashMap<>(); /** Set of nodes that cannot be used for wal rebalancing due to some reason. */ private final Set exclusionsFromHistoricalRebalance = ConcurrentHashMap.newKeySet(); @@ -587,7 +587,20 @@ public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) { * @return List of IDs of history supplier nodes or empty list if these doesn't exist. */ public List partitionHistorySupplier(int grpId, int partId, long cntrSince) { - List histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince); + List histSuppliers; + + if (partHistSuppliers == null) + histSuppliers = Collections.emptyList(); + else { + histSuppliers = new ArrayList<>(); + + for (Map.Entry> entry : partHistSuppliers.entrySet()) { + Long historyCounter = entry.getValue().get(new GroupPartitionIdPair(grpId, partId)); + + if (historyCounter != null && historyCounter <= cntrSince) + histSuppliers.add(entry.getKey()); + } + } histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains); @@ -2436,7 +2449,7 @@ private String exchangeTimingsLogMessage(String header, List timings) { // Create and destroy caches and cache proxies. cctx.cache().onExchangeDone(this, err); - Map locReserved = partHistSuppliers.getReservations(cctx.localNodeId()); + Map locReserved = partHistSuppliers.get(cctx.localNodeId()); if (locReserved != null) { boolean success = cctx.database().reserveHistoryForPreloading(locReserved); @@ -3537,7 +3550,9 @@ private void findCounterForReservation( break; if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved < ownerSize) { - partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved); + Map nodeMap = partHistSuppliers.computeIfAbsent(ownerId, k -> new ConcurrentHashMap<>()); + + nodeMap.put(new GroupPartitionIdPair(grpId, p), ceilingMinReserved); haveHistory.add(p); @@ -4628,8 +4643,8 @@ private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPa assert partHistSuppliers.isEmpty(); - partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ? msg.partitionHistorySuppliers() : - IgniteDhtPartitionHistorySuppliersMap.empty()); + if (msg.partitionHistorySuppliers() != null) + partHistSuppliers = msg.partitionHistorySuppliers(); // Reserve at least 2 threads for system operations. int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 9ee1835e1a913..f95bd4090fdb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; @@ -77,7 +78,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Order(3) @Compress @GridToStringInclude - IgniteDhtPartitionHistorySuppliersMap partHistSuppliers; + Map> partHistSuppliers; /** Partitions that must be cleared and re-loaded. */ @Order(4) @@ -144,7 +145,7 @@ public GridDhtPartitionsFullMessage() { public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, @NotNull AffinityTopologyVersion topVer, - @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable Map> partHistSuppliers, @Nullable IgniteDhtPartitionsToReloadMap partsToReload) { super(id, lastVer); @@ -325,7 +326,7 @@ public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) { /** * @return Partitions history suppliers. */ - public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() { + public Map> partitionHistorySuppliers() { return partHistSuppliers; } @@ -445,7 +446,7 @@ public void topologyVersion(AffinityTopologyVersion topVer) { partCntrs = new IgniteDhtPartitionCountersMap(); if (partHistSuppliers == null) - partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); + partHistSuppliers = new ConcurrentHashMap<>(); if (partsToReload == null) partsToReload = new IgniteDhtPartitionsToReloadMap(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java deleted file mode 100644 index 77e7dcb1f061e..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class IgniteDhtPartitionHistorySuppliersMap implements Message { - /** */ - private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new IgniteDhtPartitionHistorySuppliersMap(); - - /** */ - @Order(0) - Map> map; - - /** - * @return Empty map. - */ - public static IgniteDhtPartitionHistorySuppliersMap empty() { - return EMPTY; - } - - /** - * @param grpId Group ID. - * @param partId Partition ID. - * @param cntrSince Partition update counter since history supplying is requested. - * @return List of supplier UUIDs or empty list if haven't these. - */ - public synchronized List getSupplier(int grpId, int partId, long cntrSince) { - if (map == null) - return Collections.emptyList(); - - List suppliers = new ArrayList<>(); - - for (Map.Entry> e : map.entrySet()) { - UUID supplierNode = e.getKey(); - - Long historyCounter = e.getValue().get(new GroupPartitionIdPair(grpId, partId)); - - if (historyCounter != null && historyCounter <= cntrSince) - suppliers.add(supplierNode); - } - - return suppliers; - } - - /** - * @param nodeId Node ID to check. - * @return Reservations for the given node. - */ - @Nullable public synchronized Map getReservations(UUID nodeId) { - if (map == null) - return null; - - return map.get(nodeId); - } - - /** - * @param nodeId Node ID. - * @param grpId Cache group ID. - * @param partId Partition ID. - * @param cntr Partition counter. - */ - public synchronized void put(UUID nodeId, int grpId, int partId, long cntr) { - if (map == null) - map = new HashMap<>(); - - Map nodeMap = map.computeIfAbsent(nodeId, k -> new HashMap<>()); - - nodeMap.put(new GroupPartitionIdPair(grpId, partId), cntr); - } - - /** - * @return {@code True} if empty. - */ - public synchronized boolean isEmpty() { - return map == null || map.isEmpty(); - } - - /** - * @param that Other map to put. - */ - public synchronized void putAll(IgniteDhtPartitionHistorySuppliersMap that) { - map = that.map; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteDhtPartitionHistorySuppliersMap.class, this); - } - -} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index f16695c2d12b8..a05691b52e2c8 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1193,7 +1193,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java index fb96d7da32ed4..953f0ebf6a57a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.direct.state.DirectMessageState; @@ -28,7 +29,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; @@ -107,13 +107,13 @@ public void testWriteReadHugeMessage() { /** */ private GridDhtPartitionsFullMessage fullMessage() { - IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); + Map> partHistSuppliers = new ConcurrentHashMap<>(); IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); for (int i = 0; i < 500; i++) { UUID uuid = UUID.randomUUID(); - partHistSuppliers.put(uuid, i, i + 1, i + 2); + partHistSuppliers.put(uuid, Map.of(new GroupPartitionIdPair(i, i + 1), i + 2L)); partsToReload.put(uuid, i, i + 1); } @@ -122,8 +122,8 @@ private GridDhtPartitionsFullMessage fullMessage() { /** */ private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, GridDhtPartitionsFullMessage actual) { - Map> expHistSuppliers = U.field(expected.partitionHistorySuppliers(), "map"); - Map> actHistSuppliers = U.field(actual.partitionHistorySuppliers(), "map"); + Map> expHistSuppliers = expected.partitionHistorySuppliers(); + Map> actHistSuppliers = actual.partitionHistorySuppliers(); assertEquals(expHistSuppliers.size(), actHistSuppliers.size());