package com.hazelcast.enterprise.wan.impl.replication;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.AbstractWanPublisherConfig;
import com.hazelcast.config.ConsistencyCheckStrategy;
import com.hazelcast.config.WanReplicationConfig;
import com.hazelcast.config.WanSyncConfig;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.enterprise.wan.impl.AbstractWanAntiEntropyEvent;
import com.hazelcast.enterprise.wan.impl.WanConsistencyCheckEvent;
import com.hazelcast.enterprise.wan.impl.WanSyncEvent;
import com.hazelcast.enterprise.wan.impl.sync.WanAntiEntropyEventResult;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.diagnostics.StoreLatencyPlugin;
import com.hazelcast.internal.nio.ClassLoaderUtil;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.wan.impl.InternalWanEvent;
import com.hazelcast.wan.impl.WanAntiEntropyEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;

/* loaded from: input_file:WEB-INF/lib/hazelcast-jet-enterprise-4.3.jar:com/hazelcast/enterprise/wan/impl/replication/WanBatchPublisher.class */
public class WanBatchPublisher extends AbstractWanReplication implements Runnable, LiveOperationsTracker {
    public static final String WAN_BATCH_SENDER_CLASS = "hazelcast.wan.wanBatchSenderClass";
    public static final String WAN_EXECUTOR = "hz:wan";
    private static final int IDLE_MAX_SPINS = 20;
    private static final int IDLE_MAX_YIELDS = 50;
    protected WanBatchSender wanBatchSender;
    private final AtomicLong failedTransmitCount = new AtomicLong();
    private final Set<Operation> liveOperations = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ReentrantLock syncLock = new ReentrantLock();
    private AtomicLong ongoingSyncInvocations = new AtomicLong();
    private volatile long lastBatchSendTime = System.currentTimeMillis();
    private Executor wanExecutor;
    private BlockingQueue<InternalWanEvent> syncEvents;
    private IdleStrategy idlingStrategy;
    private ArrayList<InternalWanEvent> eventBatchHolder;
    private BatchReplicationStrategy replicationStrategy;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // com.hazelcast.enterprise.wan.impl.replication.AbstractWanReplication, com.hazelcast.enterprise.wan.impl.replication.AbstractWanPublisher, com.hazelcast.wan.WanPublisher
    public void init(WanReplicationConfig wanReplicationConfig, AbstractWanPublisherConfig abstractWanPublisherConfig) {
        super.init(wanReplicationConfig, abstractWanPublisherConfig);
        this.idlingStrategy = new BackoffIdleStrategy(20L, 50L, this.configurationContext.getIdleMinParkNs(), this.configurationContext.getIdleMaxParkNs());
        this.wanExecutor = this.node.getNodeEngine().getExecutionService().getExecutor(WAN_EXECUTOR);
        this.wanBatchSender = createWanBatchSender(this.node);
        this.syncEvents = new LinkedBlockingQueue(this.configurationContext.getBatchSize());
        this.eventBatchHolder = new ArrayList<>(this.configurationContext.getBatchSize());
        int maxConcurrentInvocations = this.configurationContext.getMaxConcurrentInvocations();
        this.logger.fine("Initialising WAN batch publisher with " + maxConcurrentInvocations + " max invocations.");
        if (maxConcurrentInvocations > 1) {
            this.replicationStrategy = new ConcurrentBatchReplicationStrategy(maxConcurrentInvocations);
        } else {
            this.replicationStrategy = new SerialBatchReplicationStrategy();
        }
        this.wanExecutor.execute(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        while (this.running) {
            try {
                if (tryMakeProgress()) {
                    i = 0;
                } else {
                    int i2 = i;
                    i++;
                    this.idlingStrategy.idle(i2);
                }
            } catch (Exception e) {
                this.logger.severe("Exception occurred in WAN replication loop", e);
            }
        }
    }

    private boolean tryMakeProgress() {
        Address nextEventBatchEndpoint;
        List<Address> targetEndpoints = getTargetEndpoints();
        if (targetEndpoints.isEmpty() || (nextEventBatchEndpoint = this.replicationStrategy.getNextEventBatchEndpoint(targetEndpoints)) == null) {
            return false;
        }
        if (!this.syncEvents.isEmpty()) {
            WanEventBatch wanEventBatch = new WanEventBatch(this.configurationContext.isSnapshotEnabled());
            ArrayList arrayList = new ArrayList(this.configurationContext.getBatchSize());
            this.syncEvents.drainTo(arrayList, this.configurationContext.getBatchSize());
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                wanEventBatch.addEvent((InternalWanEvent) it.next());
            }
            if (!sendBatch(nextEventBatchEndpoint, wanEventBatch, true)) {
                return true;
            }
            this.ongoingSyncInvocations.incrementAndGet();
            return true;
        }
        if (this.state.isReplicateEnqueuedEvents() && this.syncLock.tryLock()) {
            WanEventBatch wanEventBatch2 = null;
            try {
                if (!hasOngoingSync()) {
                    wanEventBatch2 = collectEventBatch(nextEventBatchEndpoint, targetEndpoints);
                }
                if (wanEventBatch2 != null) {
                    sendBatch(nextEventBatchEndpoint, wanEventBatch2, false);
                    return true;
                }
            } finally {
                this.syncLock.unlock();
            }
        }
        this.replicationStrategy.complete(nextEventBatchEndpoint);
        return false;
    }

    private boolean hasOngoingSync() {
        if ($assertionsDisabled || this.syncLock.isHeldByCurrentThread()) {
            return !this.syncEvents.isEmpty() || this.ongoingSyncInvocations.get() > 0;
        }
        throw new AssertionError();
    }

    private WanEventBatch collectEventBatch(Address address, List<Address> list) {
        boolean z;
        InternalPartitionService partitionService = this.node.getPartitionService();
        WanEventBatch wanEventBatch = null;
        boolean z2 = false;
        do {
            z = false;
            int firstPartitionId = this.replicationStrategy.getFirstPartitionId(address, list);
            while (true) {
                int i = firstPartitionId;
                if (i >= partitionService.getPartitionCount() || z2) {
                    break;
                }
                if (partitionService.getPartition(i).isLocal()) {
                    int batchSize = this.configurationContext.getBatchSize() - (wanEventBatch == null ? 0 : wanEventBatch.getTotalEntryCount());
                    this.eventBatchHolder.clear();
                    this.eventQueueContainer.drainRandomWanQueue(i, this.eventBatchHolder, batchSize);
                    Iterator<InternalWanEvent> it = this.eventBatchHolder.iterator();
                    while (it.hasNext()) {
                        InternalWanEvent next = it.next();
                        if (wanEventBatch == null) {
                            wanEventBatch = new WanEventBatch(this.configurationContext.isSnapshotEnabled());
                        }
                        wanEventBatch.addEvent(next);
                        z = true;
                    }
                    int totalEntryCount = wanEventBatch == null ? 0 : wanEventBatch.getTotalEntryCount();
                    z2 = totalEntryCount >= this.configurationContext.getBatchSize() || (sendingPeriodPassed() && totalEntryCount > 0) || !this.running;
                }
                firstPartitionId = i + this.replicationStrategy.getPartitionIdStep(address, list);
            }
            if (z2 || sendingPeriodPassed()) {
                break;
            }
        } while (z);
        return wanEventBatch;
    }

    private boolean sendingPeriodPassed() {
        return System.currentTimeMillis() - this.lastBatchSendTime > this.configurationContext.getBatchMaxDelayMillis();
    }

    private boolean sendBatch(Address address, WanEventBatch wanEventBatch, boolean z) {
        if (!this.running) {
            return false;
        }
        try {
            this.wanBatchSender.send(wanEventBatch, address).whenCompleteAsync((BiConsumer) (bool, th) -> {
                if (th == null) {
                    handleWanBatchResponse(wanEventBatch, address, bool.booleanValue(), z);
                } else {
                    handleWanBatchError(wanEventBatch, address, th, z);
                }
            }, this.wanExecutor);
            this.lastBatchSendTime = System.currentTimeMillis();
            return true;
        } catch (Throwable th2) {
            handleWanBatchError(wanEventBatch, address, th2, z);
            return true;
        }
    }

    private void handleWanBatchResponse(WanEventBatch wanEventBatch, Address address, boolean z, boolean z2) {
        if (!z) {
            this.failedTransmitCount.incrementAndGet();
            sendBatch(address, wanEventBatch, z2);
            return;
        }
        Iterator<InternalWanEvent> it = wanEventBatch.getEvents().iterator();
        while (it.hasNext()) {
            incrementEventCount(it.next());
        }
        finalizeWanEventReplication(wanEventBatch.getEvents(), wanEventBatch.getCoalescedEvents());
        this.replicationStrategy.complete(address);
        this.wanCounter.decrementPrimaryElementCounter(wanEventBatch.getPrimaryEventCount());
        if (z2) {
            this.ongoingSyncInvocations.decrementAndGet();
        }
    }

    private void handleWanBatchError(WanEventBatch wanEventBatch, Address address, Throwable th, boolean z) {
        this.logger.warning("Error occurred when sending WAN events to " + address, th);
        this.connectionManager.removeTargetEndpoint(address, "Error occurred when sending WAN events to " + address, th);
        this.failedTransmitCount.incrementAndGet();
        sendBatch(address, wanEventBatch, z);
    }

    @Override // com.hazelcast.enterprise.wan.impl.replication.AbstractWanPublisher
    protected WanPublisherSyncSupport createWanSyncSupport() {
        WanSyncConfig syncConfig = this.configurationContext.getPublisherConfig().getSyncConfig();
        return (syncConfig == null || !ConsistencyCheckStrategy.MERKLE_TREES.equals(syncConfig.getConsistencyCheckStrategy())) ? new WanPublisherFullSyncSupport(this.node, this) : new WanPublisherMerkleTreeSyncSupport(this.node, this.configurationContext, this);
    }

    private WanBatchSender createWanBatchSender(Node node) {
        WanBatchSender createBaseWanBatchSender = createBaseWanBatchSender(node);
        createBaseWanBatchSender.init(node, this);
        StoreLatencyPlugin storeLatencyPlugin = (StoreLatencyPlugin) node.getNodeEngine().getDiagnostics().getPlugin(StoreLatencyPlugin.class);
        return storeLatencyPlugin != null ? new LatencyTrackingWanBatchSender(createBaseWanBatchSender, storeLatencyPlugin, this.wanPublisherId, this.wanExecutor) : createBaseWanBatchSender;
    }

    private WanBatchSender createBaseWanBatchSender(Node node) {
        String property = System.getProperty(WAN_BATCH_SENDER_CLASS);
        if (StringUtil.isNullOrEmpty(property)) {
            return new DefaultWanBatchSender();
        }
        try {
            return (WanBatchSender) ClassLoaderUtil.newInstance(node.getConfigClassLoader(), property);
        } catch (Exception e) {
            throw new HazelcastException("Could not construct WAN batch sender", e);
        }
    }

    @Override // com.hazelcast.wan.impl.InternalWanPublisher
    public void publishAntiEntropyEvent(WanAntiEntropyEvent wanAntiEntropyEvent) {
        AbstractWanAntiEntropyEvent abstractWanAntiEntropyEvent = (AbstractWanAntiEntropyEvent) wanAntiEntropyEvent;
        this.liveOperations.add(abstractWanAntiEntropyEvent.getOp());
        this.wanExecutor.execute(() -> {
            abstractWanAntiEntropyEvent.setProcessingResult(new WanAntiEntropyEventResult());
            try {
                if (!(abstractWanAntiEntropyEvent instanceof WanSyncEvent)) {
                    if (abstractWanAntiEntropyEvent instanceof WanConsistencyCheckEvent) {
                        this.syncSupport.processEvent((WanConsistencyCheckEvent) abstractWanAntiEntropyEvent);
                        return;
                    } else {
                        this.logger.info("Ignoring unknown WAN anti-entropy event " + abstractWanAntiEntropyEvent);
                        return;
                    }
                }
                this.syncLock.lock();
                try {
                    this.syncSupport.processEvent((WanSyncEvent) abstractWanAntiEntropyEvent);
                    this.syncLock.unlock();
                } catch (Throwable th) {
                    this.syncLock.unlock();
                    throw th;
                }
            } catch (Exception e) {
                this.logger.warning("WAN anti-entropy event " + abstractWanAntiEntropyEvent + " processing failed", e);
            } finally {
                abstractWanAntiEntropyEvent.sendResponse();
                this.liveOperations.remove(abstractWanAntiEntropyEvent.getOp());
            }
        });
    }

    @Override // com.hazelcast.spi.impl.operationservice.LiveOperationsTracker
    public void populate(LiveOperations liveOperations) {
        for (Operation operation : this.liveOperations) {
            liveOperations.add(operation.getCallerAddress(), operation.getCallId());
        }
    }

    public long getFailedTransmissionCount() {
        return this.failedTransmitCount.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Executor getWanExecutor() {
        return this.wanExecutor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putToSyncEventQueue(InternalWanEvent internalWanEvent) {
        try {
            this.syncEvents.put(internalWanEvent);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw ExceptionUtil.rethrow(e);
        }
    }

    public BatchReplicationStrategy getReplicationStrategy() {
        return this.replicationStrategy;
    }

    static {
        $assertionsDisabled = !WanBatchPublisher.class.desiredAssertionStatus();
    }
}
