package com.hazelcast.enterprise.wan.impl.replication;

import com.hazelcast.cluster.Address;
import com.hazelcast.enterprise.wan.impl.WanConsistencyCheckEvent;
import com.hazelcast.enterprise.wan.impl.WanSyncEvent;
import com.hazelcast.enterprise.wan.impl.connection.WanConnectionWrapper;
import com.hazelcast.enterprise.wan.impl.operation.MerkleTreeNodeValueComparison;
import com.hazelcast.enterprise.wan.impl.operation.WanMerkleTreeNodeCompareOperation;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.management.events.WanConsistencyCheckFinishedEvent;
import com.hazelcast.internal.management.events.WanConsistencyCheckIgnoredEvent;
import com.hazelcast.internal.management.events.WanConsistencyCheckStartedEvent;
import com.hazelcast.internal.management.events.WanMerkleSyncFinishedEvent;
import com.hazelcast.internal.management.events.WanSyncIgnoredEvent;
import com.hazelcast.internal.management.events.WanSyncProgressUpdateEvent;
import com.hazelcast.internal.management.events.WanSyncStartedEvent;
import com.hazelcast.internal.partition.IPartition;
import com.hazelcast.internal.partition.InternalPartition;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.util.CollectionUtil;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.ThreadUtil;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MerkleTreeNodeEntries;
import com.hazelcast.map.impl.operation.MerkleTreeGetEntriesOperation;
import com.hazelcast.map.impl.operation.MerkleTreeGetEntryCountOperation;
import com.hazelcast.map.impl.operation.MerkleTreeNodeCompareOperationFactory;
import com.hazelcast.map.impl.wan.WanEnterpriseMapEvent;
import com.hazelcast.map.impl.wan.WanEnterpriseMapMerkleTreeNode;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.wan.impl.ConsistencyCheckResult;
import com.hazelcast.wan.impl.WanAntiEntropyEvent;
import com.hazelcast.wan.impl.WanReplicationService;
import com.hazelcast.wan.impl.WanSyncStats;
import com.hazelcast.wan.impl.WanSyncType;
import com.hazelcast.wan.impl.merkletree.MerkleTreeUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/hazelcast/enterprise/wan/impl/replication/WanPublisherMerkleTreeSyncSupport.class */
public class WanPublisherMerkleTreeSyncSupport implements WanPublisherSyncSupport {
    private static final int WAN_TARGET_INVOCATION_DEADLINE_SECONDS = 10;
    private static final int WAN_TARGET_INVOCATION_MIN_ATTEMPTS = 10;
    private static final long WAN_TARGET_INVOCATION_BACKOFF_MIN_PARK = TimeUnit.MILLISECONDS.toNanos(1);
    private static final long WAN_TARGET_INVOCATION_BACKOFF_MAX_PARK = TimeUnit.MILLISECONDS.toNanos(100);
    private final NodeEngineImpl nodeEngine;
    private final MapService mapService;
    private final WanConfigurationContext configurationContext;
    private final ILogger logger;
    private final WanBatchPublisher publisher;
    private final ExecutorService updateSerializingExecutor;
    private final Map<String, ConsistencyCheckResult> lastConsistencyCheckResults = new ConcurrentHashMap();
    private final Map<String, WanSyncStats> lastSyncStats = new ConcurrentHashMap();
    private final Map<UUID, WanSyncContext<WanMerkleTreeSyncStats>> syncContextMap = new ConcurrentHashMap();
    private final IdleStrategy wanTargetInvocationIdleStrategy = new BackoffIdleStrategy(0, 0, WAN_TARGET_INVOCATION_BACKOFF_MIN_PARK, WAN_TARGET_INVOCATION_BACKOFF_MAX_PARK);

    /* JADX INFO: Access modifiers changed from: package-private */
    public WanPublisherMerkleTreeSyncSupport(Node node, WanConfigurationContext wanConfigurationContext, WanBatchPublisher wanBatchPublisher) {
        this.nodeEngine = (NodeEngineImpl) Preconditions.checkNotNull(node.getNodeEngine());
        this.mapService = (MapService) this.nodeEngine.getService(MapService.SERVICE_NAME);
        this.logger = (ILogger) Preconditions.checkNotNull(node.getLogger(getClass()));
        this.configurationContext = (WanConfigurationContext) Preconditions.checkNotNull(wanConfigurationContext);
        this.publisher = (WanBatchPublisher) Preconditions.checkNotNull(wanBatchPublisher);
        this.updateSerializingExecutor = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, ThreadUtil.createThreadName(node.hazelcastInstance.getName(), "wan-sync-stats-updater"));
        });
    }

    @Override // com.hazelcast.enterprise.wan.impl.replication.WanPublisherSyncSupport
    public void processEvent(WanConsistencyCheckEvent wanConsistencyCheckEvent) throws Exception {
        String objectName = wanConsistencyCheckEvent.getObjectName();
        if (!isMapWanReplicated(objectName)) {
            String str = "WAN consistency check requested for map " + objectName + " that is not configured for WAN replication";
            this.nodeEngine.getManagementCenterService().log(new WanConsistencyCheckIgnoredEvent(wanConsistencyCheckEvent.getUuid(), this.publisher.wanReplicationName, this.publisher.wanPublisherId, objectName, str));
            throw new IllegalArgumentException(str);
        }
        String str2 = this.publisher.wanReplicationName + "/" + this.publisher.wanPublisherId;
        this.nodeEngine.getManagementCenterService().log(new WanConsistencyCheckStartedEvent(wanConsistencyCheckEvent.getUuid(), this.publisher.wanReplicationName, this.publisher.wanPublisherId, objectName));
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Checking via Merkle trees if map " + objectName + " is consistent with cluster " + str2);
        }
        this.lastConsistencyCheckResults.put(objectName, new ConsistencyCheckResult(wanConsistencyCheckEvent.getUuid(), -1, -1, -1, -1, -1));
        ConsistencyCheckResult consistencyCheckResult = new ConsistencyCheckResult(wanConsistencyCheckEvent.getUuid());
        try {
            List<Integer> localPartitions = getLocalPartitions(wanConsistencyCheckEvent);
            Map<Integer, int[]> compareMerkleTrees = compareMerkleTrees(objectName, localPartitions);
            if (compareMerkleTrees != null) {
                consistencyCheckResult = new ConsistencyCheckResult(wanConsistencyCheckEvent.getUuid(), localPartitions.size(), compareMerkleTrees.size(), getMerkleTreeLeaves(compareMerkleTrees) * localPartitions.size(), getDiffLeafCount(compareMerkleTrees), getEntriesToSync(objectName, compareMerkleTrees));
                wanConsistencyCheckEvent.getProcessingResult().addProcessedPartitions(localPartitions);
            }
            int lastEntriesToSync = consistencyCheckResult.getLastEntriesToSync();
            int lastCheckedPartitionCount = consistencyCheckResult.getLastCheckedPartitionCount();
            int lastDiffPartitionCount = consistencyCheckResult.getLastDiffPartitionCount();
            this.nodeEngine.getManagementCenterService().log(new WanConsistencyCheckFinishedEvent(wanConsistencyCheckEvent.getUuid(), this.publisher.wanReplicationName, this.publisher.wanPublisherId, objectName, lastDiffPartitionCount, lastCheckedPartitionCount, lastEntriesToSync));
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Consistency check for map " + objectName + " with cluster " + str2 + " has completed: " + lastDiffPartitionCount + " partitions out of " + lastCheckedPartitionCount + " are not consistent, " + lastEntriesToSync + " entries need to be synchronized.");
            }
        } finally {
            this.lastConsistencyCheckResults.put(objectName, consistencyCheckResult);
        }
    }

    private int getDiffLeafCount(Map<Integer, int[]> map) {
        int i = 0;
        Iterator<int[]> it = map.values().iterator();
        while (it.hasNext()) {
            i += it.next().length / 2;
        }
        return i;
    }

    private int getMerkleTreeLeaves(Map<Integer, int[]> map) {
        Iterator<int[]> it = map.values().iterator();
        if (it.hasNext()) {
            return MerkleTreeUtil.getNodesOnLevel(MerkleTreeUtil.getLevelOfNode(it.next()[0]));
        }
        return 0;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private int getEntriesToSync(String str, Map<Integer, int[]> map) {
        int i = 0;
        for (Map.Entry<Integer, int[]> entry : map.entrySet()) {
            Integer key = entry.getKey();
            int[] value = entry.getValue();
            int[] iArr = new int[value.length / 2];
            for (int i2 = 0; i2 < iArr.length; i2++) {
                iArr[i2] = value[i2 * 2];
            }
            i += ((Integer) this.nodeEngine.getOperationService().invokeOnPartition(MapService.SERVICE_NAME, new MerkleTreeGetEntryCountOperation(str, iArr), key.intValue()).joinInternal()).intValue();
        }
        return i;
    }

    @Override // com.hazelcast.enterprise.wan.impl.replication.WanPublisherSyncSupport
    public void removeReplicationEvent(WanEnterpriseMapEvent wanEnterpriseMapEvent) {
        WanEnterpriseMapMerkleTreeNode wanEnterpriseMapMerkleTreeNode = (WanEnterpriseMapMerkleTreeNode) wanEnterpriseMapEvent;
        WanSyncContext<WanMerkleTreeSyncStats> wanSyncContext = this.syncContextMap.get(wanEnterpriseMapMerkleTreeNode.getUuid());
        int partitionId = wanEnterpriseMapMerkleTreeNode.getPartitionId();
        int entryCount = wanEnterpriseMapMerkleTreeNode.getEntryCount();
        String mapName = wanEnterpriseMapEvent.getMapName();
        updateOnPartitionSync(wanSyncContext, entryCount, mapName, wanSyncContext.getSyncStats(mapName), partitionId);
    }

    private void updateOnPartitionSync(WanSyncContext<WanMerkleTreeSyncStats> wanSyncContext, int i, String str, WanMerkleTreeSyncStats wanMerkleTreeSyncStats, int i2) {
        this.updateSerializingExecutor.execute(() -> {
            int addAndGet = wanSyncContext.getSyncCounter(str, i2).addAndGet(-i);
            wanMerkleTreeSyncStats.onSyncLeaf(i);
            if (addAndGet == 0) {
                wanMerkleTreeSyncStats.onSyncPartition();
                writeManagementCenterProgressUpdateEvent(wanSyncContext.getUuid(), str, wanMerkleTreeSyncStats.getPartitionsSynced(), wanMerkleTreeSyncStats, wanMerkleTreeSyncStats.getRecordsSynced());
                completeSyncContext(wanSyncContext, str, wanMerkleTreeSyncStats);
            }
        });
    }

    private void completeSyncContext(WanSyncContext<WanMerkleTreeSyncStats> wanSyncContext, String str, WanMerkleTreeSyncStats wanMerkleTreeSyncStats) {
        if (wanMerkleTreeSyncStats.getPartitionsToSync() == wanMerkleTreeSyncStats.getPartitionsSynced()) {
            wanSyncContext.onMapSynced();
            wanMerkleTreeSyncStats.onSyncComplete();
            logSyncStats(wanMerkleTreeSyncStats, str);
            writeManagementCenterSyncFinishedEvent(wanSyncContext.getUuid(), str, wanMerkleTreeSyncStats);
            cleanupSyncContextMap();
        }
    }

    private void cleanupSyncContextMap() {
        for (Map.Entry<UUID, WanSyncContext<WanMerkleTreeSyncStats>> entry : this.syncContextMap.entrySet()) {
            UUID key = entry.getKey();
            if (entry.getValue().isCompletedOrStuck()) {
                this.syncContextMap.remove(key);
            }
        }
    }

    @Override // com.hazelcast.enterprise.wan.impl.replication.WanPublisherSyncSupport
    public void processEvent(WanSyncEvent wanSyncEvent) throws Exception {
        List<Integer> localPartitions = getLocalPartitions(wanSyncEvent);
        UUID uuid = wanSyncEvent.getUuid();
        if (wanSyncEvent.getType() != WanSyncType.ALL_MAPS) {
            String objectName = wanSyncEvent.getObjectName();
            if (!isMapWanReplicated(objectName)) {
                String str = "WAN synchronization requested for map " + objectName + " that is not configured for WAN replication";
                this.nodeEngine.getManagementCenterService().log(new WanSyncIgnoredEvent(wanSyncEvent.getUuid(), this.publisher.wanReplicationName, this.publisher.wanPublisherId, objectName, str));
                throw new IllegalArgumentException(str);
            }
            this.lastConsistencyCheckResults.put(objectName, new ConsistencyCheckResult(uuid, -1, -1, -1, -1, -1));
            WanSyncContext<WanMerkleTreeSyncStats> wanSyncContext = new WanSyncContext<>(uuid, localPartitions.size(), Collections.singletonList(objectName));
            this.syncContextMap.put(uuid, wanSyncContext);
            processMapSync(wanSyncEvent, wanSyncContext, objectName, localPartitions);
            return;
        }
        LinkedList<String> linkedList = new LinkedList();
        this.mapService.getMapServiceContext().getMapContainers().keySet().forEach(str2 -> {
            if (isMapWanReplicated(str2)) {
                linkedList.add(str2);
            }
        });
        if (CollectionUtil.isEmpty(linkedList)) {
            this.nodeEngine.getManagementCenterService().log(new WanSyncIgnoredEvent(wanSyncEvent.getUuid(), this.publisher.wanReplicationName, this.publisher.wanPublisherId, null, "No maps found to synchronize."));
            return;
        }
        WanSyncContext<WanMerkleTreeSyncStats> wanSyncContext2 = new WanSyncContext<>(uuid, localPartitions.size(), linkedList);
        this.syncContextMap.put(uuid, wanSyncContext2);
        for (String str3 : linkedList) {
            this.lastConsistencyCheckResults.put(str3, new ConsistencyCheckResult(uuid, -1, -1, -1, -1, -1));
            processMapSync(wanSyncEvent, wanSyncContext2, str3, localPartitions);
        }
    }

    private boolean isMapWanReplicated(String str) {
        return this.mapService.getMapServiceContext().getMapContainer(str).isWanReplicationEnabled();
    }

    private void processMapSync(WanSyncEvent wanSyncEvent, WanSyncContext<WanMerkleTreeSyncStats> wanSyncContext, String str, List<Integer> list) throws Exception {
        String str2 = this.publisher.wanReplicationName + "/" + this.publisher.wanPublisherId;
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Synchronizing map " + str + " to cluster " + str2 + " by using Merkle trees");
        }
        UUID uuid = wanSyncEvent.getUuid();
        ConsistencyCheckResult consistencyCheckResult = new ConsistencyCheckResult(uuid);
        try {
            this.nodeEngine.getManagementCenterService().log(new WanConsistencyCheckStartedEvent(uuid, this.publisher.wanReplicationName, this.publisher.wanPublisherId, str));
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Comparing Merkle trees of map " + str + " with cluster " + str2 + " to identify the difference");
            }
            Map<Integer, int[]> compareMerkleTrees = compareMerkleTrees(str, list);
            Set<Integer> processedPartitions = wanSyncEvent.getProcessingResult().getProcessedPartitions();
            if (compareMerkleTrees == null || compareMerkleTrees.isEmpty()) {
                WanMerkleTreeSyncStats wanMerkleTreeSyncStats = new WanMerkleTreeSyncStats(wanSyncContext.getUuid(), 0);
                wanSyncContext.addSyncStats(str, wanMerkleTreeSyncStats);
                this.lastSyncStats.put(str, wanMerkleTreeSyncStats);
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Map " + str + " found to be consistent with cluster " + str2 + ", no synchronization is needed");
                }
                this.nodeEngine.getManagementCenterService().log(new WanConsistencyCheckFinishedEvent(uuid, this.publisher.wanReplicationName, this.publisher.wanPublisherId, str, 0, list.size(), 0));
                this.nodeEngine.getManagementCenterService().log(new WanSyncStartedEvent(uuid, this.publisher.wanReplicationName, this.publisher.wanPublisherId, str));
                completeSyncContext(wanSyncContext, str, wanSyncContext.getSyncStats(str));
                this.lastConsistencyCheckResults.put(str, consistencyCheckResult);
                return;
            }
            int entriesToSync = getEntriesToSync(str, compareMerkleTrees);
            ConsistencyCheckResult consistencyCheckResult2 = new ConsistencyCheckResult(uuid, list.size(), compareMerkleTrees.size(), getMerkleTreeLeaves(compareMerkleTrees) * list.size(), getDiffLeafCount(compareMerkleTrees), entriesToSync);
            this.nodeEngine.getManagementCenterService().log(new WanConsistencyCheckFinishedEvent(uuid, this.publisher.wanReplicationName, this.publisher.wanPublisherId, str, compareMerkleTrees.size(), list.size(), entriesToSync));
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Merkle tree comparison for map " + str + " with cluster " + str2 + " has completed: " + compareMerkleTrees.size() + " partitions out of " + list.size() + " need to be synced");
            }
            this.nodeEngine.getManagementCenterService().log(new WanSyncStartedEvent(uuid, this.publisher.wanReplicationName, this.publisher.wanPublisherId, str));
            syncDifferences(wanSyncContext, str, compareMerkleTrees, processedPartitions);
            this.lastConsistencyCheckResults.put(str, consistencyCheckResult2);
        } catch (Throwable th) {
            this.lastConsistencyCheckResults.put(str, consistencyCheckResult);
            throw th;
        }
    }

    private void syncDifferences(WanSyncContext<WanMerkleTreeSyncStats> wanSyncContext, String str, Map<Integer, int[]> map, Set<Integer> set) {
        WanMerkleTreeSyncStats wanMerkleTreeSyncStats = new WanMerkleTreeSyncStats(wanSyncContext.getUuid(), map.size());
        wanSyncContext.addSyncStats(str, wanMerkleTreeSyncStats);
        this.lastSyncStats.put(str, wanMerkleTreeSyncStats);
        for (Map.Entry<Integer, int[]> entry : map.entrySet()) {
            Integer key = entry.getKey();
            Collection<MerkleTreeNodeEntries> collection = (Collection) this.nodeEngine.getOperationService().invokeOnPartition(MapService.SERVICE_NAME, new MerkleTreeGetEntriesOperation(str, entry.getValue()), key.intValue()).joinInternal();
            AtomicInteger syncCounter = wanSyncContext.getSyncCounter(str, key.intValue());
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                syncCounter.addAndGet(((MerkleTreeNodeEntries) it.next()).getNodeEntries().size());
            }
            for (MerkleTreeNodeEntries merkleTreeNodeEntries : collection) {
                if (!merkleTreeNodeEntries.getNodeEntries().isEmpty()) {
                    this.publisher.putToSyncEventQueue(new WanEnterpriseMapMerkleTreeNode(wanSyncContext.getUuid(), str, merkleTreeNodeEntries, key.intValue()));
                }
            }
            if (syncCounter.get() == 0) {
                updateOnPartitionSync(wanSyncContext, 0, str, wanMerkleTreeSyncStats, key.intValue());
            }
            set.add(key);
        }
    }

    private void logSyncStats(WanMerkleTreeSyncStats wanMerkleTreeSyncStats, String str) {
        this.logger.info(String.format("Synchronization finished for map '%s' %n%nMerkle synchronization statistics:%n\t Synchronization UUID: %s%n\t Duration: %d secs%n\t Total records synchronized: %d%n\t Total partitions synchronized: %d%n\t Total Merkle tree nodes synchronized: %d%n\t Average records per Merkle tree node: %.2f%n\t StdDev of records per Merkle tree node: %.2f%n\t Minimum records per Merkle tree node: %d%n\t Maximum records per Merkle tree node: %d%n", str, wanMerkleTreeSyncStats.getUuid(), Long.valueOf(wanMerkleTreeSyncStats.getDurationSecs()), Integer.valueOf(wanMerkleTreeSyncStats.getRecordsSynced()), Integer.valueOf(wanMerkleTreeSyncStats.getPartitionsSynced()), Integer.valueOf(wanMerkleTreeSyncStats.getNodesSynced()), Double.valueOf(wanMerkleTreeSyncStats.getAvgEntriesPerLeaf()), Double.valueOf(wanMerkleTreeSyncStats.getStdDevEntriesPerLeaf()), Integer.valueOf(wanMerkleTreeSyncStats.getMinLeafEntryCount()), Integer.valueOf(wanMerkleTreeSyncStats.getMaxLeafEntryCount())));
    }

    private void writeManagementCenterSyncFinishedEvent(UUID uuid, String str, WanMerkleTreeSyncStats wanMerkleTreeSyncStats) {
        this.nodeEngine.getManagementCenterService().log(new WanMerkleSyncFinishedEvent(uuid, this.publisher.wanReplicationName, this.publisher.wanPublisherId, str, wanMerkleTreeSyncStats.getDurationSecs(), wanMerkleTreeSyncStats.getPartitionsSynced(), wanMerkleTreeSyncStats.getNodesSynced(), wanMerkleTreeSyncStats.getRecordsSynced(), wanMerkleTreeSyncStats.getMinLeafEntryCount(), wanMerkleTreeSyncStats.getMaxLeafEntryCount(), wanMerkleTreeSyncStats.getAvgEntriesPerLeaf(), wanMerkleTreeSyncStats.getStdDevEntriesPerLeaf()));
    }

    private void writeManagementCenterProgressUpdateEvent(UUID uuid, String str, int i, WanMerkleTreeSyncStats wanMerkleTreeSyncStats, int i2) {
        this.nodeEngine.getManagementCenterService().log(new WanSyncProgressUpdateEvent(uuid, this.publisher.wanReplicationName, this.publisher.wanPublisherId, str, wanMerkleTreeSyncStats.getPartitionsToSync(), i, i2));
    }

    private Map<Integer, int[]> compareMerkleTrees(String str, List<Integer> list) throws Exception {
        if (list.isEmpty()) {
            return null;
        }
        Map<Integer, int[]> createHashMap = MapUtil.createHashMap(list.size());
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            createHashMap.put(it.next(), new int[0]);
        }
        MerkleTreeComparisonProcessor merkleTreeComparisonProcessor = new MerkleTreeComparisonProcessor();
        while (true) {
            Map<Integer, int[]> invokeLocal = invokeLocal(str, createHashMap);
            merkleTreeComparisonProcessor.processLocalNodeValues(invokeLocal);
            if (merkleTreeComparisonProcessor.isComparisonFinished()) {
                return merkleTreeComparisonProcessor.getDifference();
            }
            Map<Integer, int[]> compareWithRemoteCluster = compareWithRemoteCluster(str, invokeLocal);
            merkleTreeComparisonProcessor.processRemoteNodeValues(compareWithRemoteCluster);
            if (merkleTreeComparisonProcessor.isComparisonFinished()) {
                return merkleTreeComparisonProcessor.getDifference();
            }
            createHashMap = compareWithRemoteCluster;
        }
    }

    private Map<Integer, int[]> invokeLocal(String str, Map<Integer, int[]> map) throws Exception {
        return this.nodeEngine.getOperationService().invokeOnPartitions(MapService.SERVICE_NAME, new MerkleTreeNodeCompareOperationFactory(str, new MerkleTreeNodeValueComparison(map)), map.keySet());
    }

    private Map<Integer, int[]> removeIdenticalPartitions(Map<Integer, int[]> map) {
        Iterator<Map.Entry<Integer, int[]>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            int[] value = it.next().getValue();
            if (value != null && value.length == 0) {
                it.remove();
            }
        }
        return map;
    }

    private Map<Integer, int[]> compareWithRemoteCluster(String str, Map<Integer, int[]> map) {
        List<Address> awaitAndGetTargetEndpoints = this.publisher.getConnectionManager().awaitAndGetTargetEndpoints();
        if (awaitAndGetTargetEndpoints.isEmpty()) {
            return null;
        }
        MerkleTreeNodeValueComparison merkleTreeNodeValueComparison = (MerkleTreeNodeValueComparison) invokeOnWanTarget(awaitAndGetTargetEndpoints.get(map.keySet().iterator().next().intValue() % awaitAndGetTargetEndpoints.size()), new WanMerkleTreeNodeCompareOperation(str, new MerkleTreeNodeValueComparison(map)));
        HashMap hashMap = new HashMap(merkleTreeNodeValueComparison.getPartitionIds().size());
        Iterator<Integer> it = merkleTreeNodeValueComparison.getPartitionIds().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            hashMap.put(Integer.valueOf(intValue), merkleTreeNodeValueComparison.getMerkleTreeNodeValues(intValue));
        }
        return removeIdenticalPartitions(hashMap);
    }

    @Override // com.hazelcast.enterprise.wan.impl.replication.WanPublisherSyncSupport
    public Map<String, ConsistencyCheckResult> getLastConsistencyCheckResults() {
        return this.lastConsistencyCheckResults;
    }

    @Override // com.hazelcast.enterprise.wan.impl.replication.WanPublisherSyncSupport
    public Map<String, WanSyncStats> getLastSyncStats() {
        return Collections.unmodifiableMap(this.lastSyncStats);
    }

    @Override // com.hazelcast.enterprise.wan.impl.replication.WanPublisherSyncSupport
    public void destroyMapData(String str) {
        this.lastConsistencyCheckResults.remove(str);
        this.lastSyncStats.remove(str);
    }

    private List<Integer> getLocalPartitions(WanAntiEntropyEvent wanAntiEntropyEvent) {
        Set<Integer> partitionSet = wanAntiEntropyEvent.getPartitionSet();
        InternalPartitionService partitionService = this.nodeEngine.getPartitionService();
        LinkedList linkedList = new LinkedList();
        if (CollectionUtil.isEmpty(partitionSet)) {
            for (IPartition iPartition : partitionService.getPartitions()) {
                if (iPartition.isLocal()) {
                    linkedList.add(Integer.valueOf(iPartition.getPartitionId()));
                }
            }
        } else {
            Iterator<Integer> it = partitionSet.iterator();
            while (it.hasNext()) {
                InternalPartition partition = partitionService.getPartition(it.next().intValue());
                if (partition.isLocal()) {
                    linkedList.add(Integer.valueOf(partition.getPartitionId()));
                }
            }
        }
        return linkedList;
    }

    private <T> T invokeOnWanTarget(Address address, Operation operation) {
        long calculateDeadline = calculateDeadline();
        int i = 0;
        while (true) {
            if (System.nanoTime() >= calculateDeadline && i >= 10) {
                throw new IllegalStateException("Could not obtain a connection to " + address + " within 10 seconds after " + i + " attempts");
            }
            WanConnectionWrapper connection = this.publisher.getConnectionManager().getConnection(address);
            if (connection != null) {
                return (T) this.nodeEngine.getOperationService().createInvocationBuilder(WanReplicationService.SERVICE_NAME, operation, connection.getConnection().getRemoteAddress()).setTryCount(1).setConnectionManager(connection.getConnection().getConnectionManager()).setCallTimeout(this.configurationContext.getResponseTimeoutMillis()).invoke().joinInternal();
            }
            int i2 = i;
            i++;
            this.wanTargetInvocationIdleStrategy.idle(i2);
        }
    }

    private long calculateDeadline() {
        return System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
    }
}
