package com.hazelcast.enterprise.wan.impl;

import com.hazelcast.config.WanAcknowledgeType;
import com.hazelcast.enterprise.wan.impl.operation.WanEventContainerOperation;
import com.hazelcast.enterprise.wan.impl.replication.WanEventBatch;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.metrics.MetricDescriptorConstants;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.WanSupportingService;
import com.hazelcast.internal.util.ThreadUtil;
import com.hazelcast.internal.util.executor.StripedExecutor;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.wan.impl.InternalWanEvent;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/hazelcast-jet-enterprise-4.3.jar:com/hazelcast/enterprise/wan/impl/WanEventProcessor.class */
public class WanEventProcessor implements LiveOperationsTracker {
    private static final int STRIPED_RUNNABLE_JOB_QUEUE_SIZE = 1000;
    private static final int DEFAULT_KEY_FOR_STRIPED_EXECUTORS = -1;
    private final ILogger logger;
    private final Node node;
    private volatile StripedExecutor executor;
    private final Object executorMutex = new Object();
    private final Set<Operation> liveOperations = Collections.newSetFromMap(new ConcurrentHashMap());
    private final WanAcknowledger acknowledger = createAcknowledger();

    /* JADX INFO: Access modifiers changed from: package-private */
    public WanEventProcessor(Node node) {
        this.logger = node.getLogger(WanEventProcessor.class.getName());
        this.node = node;
    }

    private WanAcknowledger createAcknowledger() {
        int integer = this.node.getProperties().getInteger(ClusterProperty.WAN_CONSUMER_INVOCATION_THRESHOLD);
        return integer <= 0 ? new WanNonThrottlingAcknowledger(this.node) : new WanThrottlingAcknowledger(this.node, integer);
    }

    public void handleRepEvent(WanEventBatch wanEventBatch, WanEventContainerOperation wanEventContainerOperation) {
        Collection<InternalWanEvent> events = wanEventBatch.getEvents();
        executeAndNotify(new BatchWanEventRunnable(wanEventBatch, wanEventContainerOperation, events.isEmpty() ? -1 : getPartitionId(events.iterator().next().getKey()), this.node.getNodeEngine(), this.liveOperations, this.logger, this.acknowledger), wanEventContainerOperation);
    }

    public void handleRepEvent(InternalWanEvent internalWanEvent, WanEventContainerOperation wanEventContainerOperation) {
        executeAndNotify(new WanEventRunnable(internalWanEvent, wanEventContainerOperation, getPartitionId(internalWanEvent.getKey()), this.node.getNodeEngine(), this.liveOperations, this.logger, this.acknowledger), wanEventContainerOperation);
    }

    public void handleEvent(InternalWanEvent internalWanEvent, WanAcknowledgeType wanAcknowledgeType) {
        ((WanSupportingService) this.node.getNodeEngine().getService(internalWanEvent.getServiceName())).onReplicationEvent(internalWanEvent, wanAcknowledgeType);
    }

    private void executeAndNotify(Runnable runnable, WanEventContainerOperation wanEventContainerOperation) {
        StripedExecutor executor = getExecutor();
        try {
            this.liveOperations.add(wanEventContainerOperation);
            executor.execute(runnable);
        } catch (RejectedExecutionException e) {
            this.logger.warning("Can not handle incoming wan replication event.", e);
            try {
                wanEventContainerOperation.sendResponse(false);
                this.liveOperations.remove(wanEventContainerOperation);
            } catch (Throwable th) {
                this.liveOperations.remove(wanEventContainerOperation);
                throw th;
            }
        }
    }

    private StripedExecutor getExecutor() {
        StripedExecutor stripedExecutor = this.executor;
        if (stripedExecutor == null) {
            synchronized (this.executorMutex) {
                if (this.executor == null) {
                    this.executor = new StripedExecutor(this.logger, ThreadUtil.createThreadName(this.node.hazelcastInstance.getName(), MetricDescriptorConstants.WAN_PREFIX), 16, 1000);
                }
                stripedExecutor = this.executor;
            }
        }
        return stripedExecutor;
    }

    private int getPartitionId(Data data) {
        return this.node.getNodeEngine().getPartitionService().getPartitionId(data);
    }

    @Override // com.hazelcast.spi.impl.operationservice.LiveOperationsTracker
    public void populate(LiveOperations liveOperations) {
        for (Operation operation : this.liveOperations) {
            liveOperations.add(operation.getCallerAddress(), operation.getCallId());
        }
    }

    public void shutdown() {
        StripedExecutor stripedExecutor = this.executor;
        if (stripedExecutor != null) {
            stripedExecutor.shutdown();
        }
    }
}
