/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan.serial;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

public class ConcurrentSerialGatewaySenderEventProcessor
extends AbstractGatewaySenderEventProcessor {
    private static final Logger logger = LogService.getLogger();
    protected final List<SerialGatewaySenderEventProcessor> processors = new ArrayList<SerialGatewaySenderEventProcessor>();
    protected final AbstractGatewaySender sender;
    private GemFireException ex = null;
    private final Set<RegionQueue> queues;

    public ConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender) {
        super(LoggingThreadGroup.createThreadGroup("Event Processor for GatewaySender_" + sender.getId(), logger), "Event Processor for GatewaySender_" + sender.getId(), sender);
        this.sender = sender;
        this.initializeMessageQueue(sender.getId());
        this.queues = new HashSet<RegionQueue>();
        for (SerialGatewaySenderEventProcessor processor : this.processors) {
            this.queues.add(processor.getQueue());
        }
        this.setDaemon(true);
    }

    @Override
    protected void initializeMessageQueue(String id) {
        for (int i = 0; i < this.sender.getDispatcherThreads(); ++i) {
            this.processors.add(new SerialGatewaySenderEventProcessor(this.sender, id + "." + i));
            if (!logger.isDebugEnabled()) continue;
            logger.debug("Created the SerialGatewayEventProcessor_{}->{}", (Object)i, (Object)this.processors.get(i));
        }
    }

    @Override
    public int eventQueueSize() {
        int size = 0;
        for (RegionQueue queue : this.queues) {
            size += queue.size();
        }
        return size;
    }

    @Override
    public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) throws IOException, CacheException {
        int index = Math.abs(this.getHashCode((EntryEventImpl)event) % this.processors.size());
        this.enqueueEvent(operation, event, substituteValue, index);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, int index) throws CacheException, IOException {
        SerialGatewaySenderEventProcessor serialProcessor = this.processors.get(index);
        if (this.sender.getOrderPolicy() == GatewaySender.OrderPolicy.KEY || this.sender.getOrderPolicy() == GatewaySender.OrderPolicy.PARTITION) {
            EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl)event);
            try {
                EventID originalEventId = clonedEvent.getEventId();
                if (logger.isDebugEnabled()) {
                    logger.debug("The original EventId is {}", (Object)originalEventId);
                }
                long newThreadId = ThreadIdentifier.createFakeThreadIDForParallelGateway(index, originalEventId.getThreadID(), 0);
                EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId, originalEventId.getSequenceID());
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}", (Object)this, event.getKey(), (Object)index, (Object)originalEventId, (Object)originalEventId.getThreadID(), (Object)newEventId, (Object)newThreadId);
                }
                clonedEvent.setEventId(newEventId);
                serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue);
            }
            finally {
                clonedEvent.release();
            }
        } else {
            serialProcessor.enqueueEvent(operation, event, substituteValue);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        for (int i = 0; i < this.processors.size(); ++i) {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting the serialProcessor {}", (Object)i);
            }
            this.processors.get(i).start();
        }
        try {
            this.waitForRunningStatus();
        }
        catch (GatewaySenderException e) {
            this.ex = e;
        }
        Iterator<SerialGatewaySenderEventProcessor> iterator = this.runningStateLock;
        synchronized (iterator) {
            if (this.ex != null) {
                this.setException(this.ex);
                this.setIsStopped(true);
            } else {
                this.setIsStopped(false);
            }
            this.runningStateLock.notifyAll();
        }
        for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
            try {
                serialProcessor.join();
            }
            catch (InterruptedException e) {
                if (!logger.isDebugEnabled()) continue;
                logger.debug("Got InterruptedException while waiting for child threads to finish.");
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    protected void rebalance() {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForRunningStatus() {
        for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
            Object object = serialProcessor.runningStateLock;
            synchronized (object) {
                while (serialProcessor.getException() == null && serialProcessor.isStopped()) {
                    try {
                        serialProcessor.runningStateLock.wait();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                Exception ex = serialProcessor.getException();
                if (ex != null) {
                    throw new GatewaySenderException(LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1.toLocalizedString(this.getId(), ex.getMessage()), ex.getCause());
                }
            }
        }
    }

    private int getHashCode(EntryEventImpl event) {
        int eventHashCode = 0;
        switch (this.sender.getOrderPolicy()) {
            case KEY: {
                eventHashCode = event.getKey().hashCode();
                break;
            }
            case THREAD: {
                EventID eventId = event.getEventId();
                byte[] memberId = eventId.getMembershipID();
                long threadId = eventId.getThreadID();
                int memberIdHashCode = Arrays.hashCode(memberId);
                int threadIdHashCode = (int)(threadId ^ threadId >>> 32);
                eventHashCode = memberIdHashCode + threadIdHashCode;
                if (!logger.isDebugEnabled()) break;
                logger.debug("{}: Generated hashcode for event with key={}, memberId={}, threadId={}: {}", (Object)this, event.getKey(), (Object)Arrays.toString(memberId), (Object)threadId, (Object)eventHashCode);
                break;
            }
            case PARTITION: {
                int n = eventHashCode = PartitionRegionHelper.isPartitionedRegion(event.getRegion()) ? PartitionedRegionHelper.getHashKey(event) : event.getKey().hashCode();
                if (!logger.isDebugEnabled()) break;
                logger.debug("{}: Generated partition hashcode for event with key={}: {}", (Object)this, event.getKey(), (Object)eventHashCode);
            }
        }
        return eventHashCode;
    }

    @Override
    public void stopProcessing() {
        if (!this.isAlive()) {
            return;
        }
        this.setIsStopped(true);
        final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup.createThreadGroup("ConcurrentSerialGatewaySenderEventProcessor Logger Group", logger);
        ThreadFactory threadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable task) {
                Thread thread = new Thread(loggingThreadGroup, task, "ConcurrentSerialGatewaySenderEventProcessor Stopper Thread");
                thread.setDaemon(true);
                return thread;
            }
        };
        ArrayList<AbstractGatewaySenderEventProcessor.SenderStopperCallable> stopperCallables = new ArrayList<AbstractGatewaySenderEventProcessor.SenderStopperCallable>();
        for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
            stopperCallables.add(new AbstractGatewaySenderEventProcessor.SenderStopperCallable(serialProcessor));
        }
        ExecutorService stopperService = Executors.newFixedThreadPool(this.processors.size(), threadFactory);
        try {
            List futures = stopperService.invokeAll(stopperCallables);
            for (Future f : futures) {
                try {
                    boolean b = (Boolean)f.get();
                    if (!logger.isDebugEnabled()) continue;
                    logger.debug("ConcurrentSerialGatewaySenderEventProcessor: {} stopped dispatching: {}", (Object)(b ? "Successfully" : "Unsuccesfully"), (Object)this);
                }
                catch (ExecutionException e) {
                    logger.warn((Message)LocalizedMessage.create(LocalizedStrings.GatewaySender_0_CAUGHT_EXCEPTION_WHILE_STOPPING_1, new Object[]{this.sender, e.getCause()}));
                }
            }
        }
        catch (InterruptedException e) {
            throw new InternalGemFireException(e.getMessage());
        }
        catch (RejectedExecutionException rejectedExecutionEx) {
            throw rejectedExecutionEx;
        }
        stopperService.shutdown();
        this.closeProcessor();
        if (logger.isDebugEnabled()) {
            logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Stopped dispatching: {}", (Object)this);
        }
    }

    @Override
    public void closeProcessor() {
        for (SerialGatewaySenderEventProcessor processor : this.processors) {
            processor.closeProcessor();
        }
    }

    @Override
    public void pauseDispatching() {
        for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
            serialProcessor.pauseDispatching();
        }
        super.pauseDispatching();
        if (logger.isDebugEnabled()) {
            logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Paused dispatching: {}", (Object)this);
        }
    }

    @Override
    public void resumeDispatching() {
        for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
            serialProcessor.resumeDispatching();
        }
        super.resumeDispatching();
        if (logger.isDebugEnabled()) {
            logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Resumed dispatching: {}", (Object)this);
        }
    }

    public Set<RegionQueue> getQueues() {
        return this.queues;
    }

    @Override
    public void removeCacheListener() {
        for (SerialGatewaySenderEventProcessor processor : this.processors) {
            processor.removeCacheListener();
        }
    }

    @Override
    public void waitForDispatcherToPause() {
        for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
            serialProcessor.waitForDispatcherToPause();
        }
    }

    @Override
    public GatewaySenderEventDispatcher getDispatcher() {
        return this.processors.get(0).getDispatcher();
    }

    @Override
    public void initializeEventDispatcher() {
    }
}

