package com.hazelcast.enterprise.wan.impl.sync;

import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.enterprise.wan.impl.AbstractWanAntiEntropyEvent;
import com.hazelcast.enterprise.wan.impl.EnterpriseWanReplicationService;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.monitor.WanSyncState;
import com.hazelcast.internal.monitor.impl.WanSyncStateImpl;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.internal.util.collection.InflatableSet;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.operationservice.OperationService;
import com.hazelcast.wan.impl.WanReplicationService;
import com.hazelcast.wan.impl.WanSyncStatus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/* loaded from: input_file:WEB-INF/lib/hazelcast-jet-enterprise-4.3.jar:com/hazelcast/enterprise/wan/impl/sync/WanSyncManager.class */
public class WanSyncManager {
    private static final int RETRY_INTERVAL_MILLIS = 5000;
    private static final int MAX_RETRY_COUNT = 5;
    private static final AtomicReferenceFieldUpdater<WanSyncManager, WanSyncStatus> SYNC_STATUS = AtomicReferenceFieldUpdater.newUpdater(WanSyncManager.class, WanSyncStatus.class, "syncStatus");
    private final EnterpriseWanReplicationService wanReplicationService;
    private final ILogger logger;
    private final Node node;
    private volatile WanSyncStatus syncStatus = WanSyncStatus.READY;
    private volatile boolean running = true;
    private volatile String activeWanReplicationName;
    private volatile String activePublisherId;

    public WanSyncManager(EnterpriseWanReplicationService enterpriseWanReplicationService, Node node) {
        this.node = node;
        this.wanReplicationService = enterpriseWanReplicationService;
        this.logger = node.getLogger(getClass());
    }

    public void shutdown() {
        this.running = false;
    }

    public void initiateAntiEntropyRequest(String str, String str2, AbstractWanAntiEntropyEvent abstractWanAntiEntropyEvent) {
        if (this.wanReplicationService.getPublisherOrNull(str, str2) == null) {
            String format = String.format("Sync request failed because WAN Replication Config doesn't exist with WAN configuration name %s and publisher ID %s.", str, str2);
            if (this.node.isLiteMember()) {
                format = format + " If you have added the WAN configuration dynamically, try re-running WAN sync on non-lite members.";
            }
            throw new InvalidConfigurationException(format);
        }
        if (!SYNC_STATUS.compareAndSet(this, WanSyncStatus.READY, WanSyncStatus.IN_PROGRESS)) {
            throw new SyncFailedException("Another anti-entropy request is already in progress.");
        }
        this.activeWanReplicationName = str;
        this.activePublisherId = str2;
        this.node.getNodeEngine().getExecutionService().execute("hz:wan:sync:pool", () -> {
            getOperationService().invokeOnTarget(WanReplicationService.SERVICE_NAME, new WanAntiEntropyEventStarterOperation(str, str2, abstractWanAntiEntropyEvent), this.node.getThisAddress()).whenCompleteAsync((obj, th) -> {
                if (th == null) {
                    this.logger.info("WAN anti-entropy request " + abstractWanAntiEntropyEvent + " has been processed");
                } else {
                    this.logger.warning("WAN anti-entropy request " + abstractWanAntiEntropyEvent + " processing failed", th);
                }
            });
        });
        this.logger.info("WAN anti-entropy request " + abstractWanAntiEntropyEvent + " has been sent");
    }

    public WanSyncState getWanSyncState() {
        return new WanSyncStateImpl(this.syncStatus, this.activeWanReplicationName, this.activePublisherId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publishAntiEntropyEventOnMembers(String str, String str2, AbstractWanAntiEntropyEvent abstractWanAntiEntropyEvent) {
        int i = 0;
        try {
            Set<Integer> partitionSet = abstractWanAntiEntropyEvent.getPartitionSet();
            while (true) {
                if (!this.running) {
                    break;
                }
                broadcastEvent(str, str2, abstractWanAntiEntropyEvent, partitionSet);
                if (partitionSet.isEmpty()) {
                    break;
                }
                i++;
                if (i == 5) {
                    this.logger.warning(String.format("WAN anti-entropy event publication failed after %s attempts with %s partitions not processed", 5, Integer.valueOf(partitionSet.size())));
                    break;
                } else {
                    this.logger.info(String.format("WAN anti-entropy event publication will retry because %s partitions have not been processed", Integer.valueOf(partitionSet.size())));
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            SYNC_STATUS.set(this, i == 5 ? WanSyncStatus.FAILED : WanSyncStatus.READY);
        } catch (Throwable th) {
            SYNC_STATUS.set(this, i == 5 ? WanSyncStatus.FAILED : WanSyncStatus.READY);
            throw th;
        }
    }

    private void broadcastEvent(String str, String str2, AbstractWanAntiEntropyEvent abstractWanAntiEntropyEvent, Set<Integer> set) {
        Collection<Member> members = getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR);
        ArrayList arrayList = new ArrayList(members.size());
        for (Member member : members) {
            AbstractWanAntiEntropyEvent cloneWithoutPartitionKeys = abstractWanAntiEntropyEvent.cloneWithoutPartitionKeys();
            cloneWithoutPartitionKeys.setPartitionSet(set);
            arrayList.add(getOperationService().invokeOnTarget(WanReplicationService.SERVICE_NAME, new WanAntiEntropyEventPublishOperation(str, str2, cloneWithoutPartitionKeys), member.getAddress()));
        }
        if (set == null) {
            set = getAllPartitions();
        }
        addResultOfOps(arrayList, set);
    }

    private OperationService getOperationService() {
        return this.node.getNodeEngine().getOperationService();
    }

    private void addResultOfOps(List<Future<WanAntiEntropyEventResult>> list, Set<Integer> set) {
        boolean z = false;
        Iterator<Future<WanAntiEntropyEventResult>> it = list.iterator();
        while (it.hasNext()) {
            try {
                set.removeAll(it.next().get().getProcessedPartitions());
            } catch (Exception e) {
                if (!z) {
                    this.logger.warning("Exception occurred during WAN sync, missing WAN sync objects will be retried.", e);
                    z = true;
                }
            }
        }
    }

    private Set<Integer> getAllPartitions() {
        int partitionCount = getPartitionService().getPartitionCount();
        InflatableSet.Builder newBuilder = InflatableSet.newBuilder(partitionCount);
        for (int i = 0; i < partitionCount; i++) {
            newBuilder.add((InflatableSet.Builder) Integer.valueOf(i));
        }
        return newBuilder.build();
    }

    private IPartitionService getPartitionService() {
        return this.node.getPartitionService();
    }

    private ClusterService getClusterService() {
        return this.node.getClusterService();
    }

    public void setActiveWanReplicationName(String str) {
        this.activeWanReplicationName = str;
    }

    public void setActivePublisherId(String str) {
        this.activePublisherId = str;
    }
}
