/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan.parallel;

import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.BucketRegionQueue;
import org.apache.geode.internal.cache.ColocationHelper;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DiskRegionStats;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage;
import org.apache.geode.internal.cache.wan.parallel.RREventIDResolver;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.size.SingleObjectSizer;
import org.apache.geode.internal.util.concurrent.StoppableCondition;
import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

public class ParallelGatewaySenderQueue
implements RegionQueue {
    protected static final Logger logger = LogService.getLogger();
    protected final Map<String, PartitionedRegion> userRegionNameToshadowPRMap = new ConcurrentHashMap<String, PartitionedRegion>();
    private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
    protected final StoppableReentrantLock buckToDispatchLock;
    private final StoppableCondition regionToDispatchedKeysMapEmpty;
    protected final StoppableReentrantLock queueEmptyLock;
    private volatile boolean isQueueEmpty = true;
    private StoppableCondition queueEmptyCondition;
    protected final GatewaySenderStats stats;
    protected volatile boolean resetLastPeeked = false;
    private final ConcurrentMap<Integer, BlockingQueue<GatewaySenderEventImpl>> bucketToTempQueueMap = new ConcurrentHashMap<Integer, BlockingQueue<GatewaySenderEventImpl>>();
    public static final int DEFAULT_MESSAGE_SYNC_INTERVAL = 10;
    protected static volatile int messageSyncInterval = 10;
    private BatchRemovalThread removalThread = null;
    protected BlockingQueue<GatewaySenderEventImpl> peekedEvents = new LinkedBlockingQueue<GatewaySenderEventImpl>();
    private BlockingQueue<GatewaySenderEventImpl> peekedEventsProcessing = new LinkedBlockingQueue<GatewaySenderEventImpl>();
    private boolean peekedEventsProcessingInProgress = false;
    public final AbstractGatewaySender sender;
    public static final int WAIT_CYCLE_SHADOW_BUCKET_LOAD = 10;
    public static final String QSTRING = "_PARALLEL_GATEWAY_SENDER_QUEUE";
    private ExecutorService conflationExecutor;
    protected final int index;
    protected final int nDispatcher;
    private MetaRegionFactory metaRegionFactory;
    private int pickBucketId;

    public ParallelGatewaySenderQueue(AbstractGatewaySender sender, Set<Region> userRegions, int idx, int nDispatcher) {
        this(sender, userRegions, idx, nDispatcher, new MetaRegionFactory());
    }

    ParallelGatewaySenderQueue(AbstractGatewaySender sender, Set<Region> userRegions, int idx, int nDispatcher, MetaRegionFactory metaRegionFactory) {
        this.metaRegionFactory = metaRegionFactory;
        this.index = idx;
        this.nDispatcher = nDispatcher;
        this.stats = sender.getStatistics();
        this.sender = sender;
        ArrayList<Region> listOfRegions = new ArrayList<Region>(userRegions);
        Collections.sort(listOfRegions, new Comparator<Region>(){

            @Override
            public int compare(Region o1, Region o2) {
                return o1.getFullPath().compareTo(o2.getFullPath());
            }
        });
        for (Region userRegion : listOfRegions) {
            if (userRegion instanceof PartitionedRegion) {
                this.addShadowPartitionedRegionForUserPR((PartitionedRegion)userRegion);
                continue;
            }
            if (this.sender.getId().contains("AsyncEventQueue_")) {
                throw new AsyncEventQueueConfigurationException(LocalizedStrings.ParallelAsyncEventQueue_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1.toLocalizedString(AsyncEventQueueImpl.getAsyncEventQueueIdFromSenderId(this.sender.getId()), userRegion.getFullPath()));
            }
            throw new GatewaySenderConfigurationException(LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1.toLocalizedString(this.sender.getId(), userRegion.getFullPath()));
        }
        this.buckToDispatchLock = new StoppableReentrantLock(sender.getCancelCriterion());
        this.regionToDispatchedKeysMapEmpty = this.buckToDispatchLock.newCondition();
        this.queueEmptyLock = new StoppableReentrantLock(sender.getCancelCriterion());
        this.queueEmptyCondition = this.queueEmptyLock.newCondition();
        if (sender.isBatchConflationEnabled()) {
            this.initializeConflationThreadPool();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Class<ParallelGatewaySenderQueue> clazz = ParallelGatewaySenderQueue.class;
        synchronized (ParallelGatewaySenderQueue.class) {
            if (this.removalThread == null) {
                this.removalThread = new BatchRemovalThread((GemFireCacheImpl)this.sender.getCache(), this);
                this.removalThread.start();
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
        block22: {
            PartitionedRegion prQ;
            block20: {
                block21: {
                    this.sender.getLifeCycleLock().writeLock().lock();
                    prQ = null;
                    if (logger.isDebugEnabled()) {
                        logger.debug("addShadowPartitionedRegionForUserRR: Going to create shadowpr for userRegion {}", (Object)userRegion.getFullPath());
                    }
                    String regionName = userRegion.getFullPath();
                    if (!this.userRegionNameToshadowPRMap.containsKey(regionName)) break block20;
                    if (prQ == null) break block21;
                    this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ);
                }
                this.sender.getLifeCycleLock().writeLock().unlock();
                return;
            }
            try {
                GemFireCacheImpl cache = (GemFireCacheImpl)this.sender.getCache();
                String prQName = ParallelGatewaySenderQueue.getQueueName(this.sender.getId(), userRegion.getFullPath());
                prQ = (PartitionedRegion)cache.getRegion(prQName);
                if (prQ == null) {
                    AttributesFactory fact = new AttributesFactory();
                    fact.setConcurrencyChecksEnabled(false);
                    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
                    pfact.setTotalNumBuckets(this.sender.getMaxParallelismForReplicatedRegion());
                    int localMaxMemory = userRegion.getDataPolicy().withStorage() ? this.sender.getMaximumQueueMemory() : 0;
                    pfact.setLocalMaxMemory(localMaxMemory);
                    pfact.setRedundantCopies(3);
                    pfact.setPartitionResolver(new RREventIDResolver());
                    if (this.sender.isPersistenceEnabled()) {
                        fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
                    }
                    fact.setDiskStoreName(this.sender.getDiskStoreName());
                    if (this.sender.isPersistenceEnabled()) {
                        fact.setDiskSynchronous(this.sender.isDiskSynchronous());
                    } else {
                        fact.setDiskSynchronous(false);
                    }
                    EvictionAttributes ea = EvictionAttributes.createLIFOMemoryAttributes(this.sender.getMaximumQueueMemory(), EvictionAction.OVERFLOW_TO_DISK);
                    fact.setEvictionAttributes(ea);
                    fact.setPartitionAttributes(pfact.create());
                    RegionAttributes ra = fact.create();
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: Attempting to create queue region: {}", (Object)this, (Object)prQName);
                    }
                    ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, this.sender);
                    try {
                        prQ = (PartitionedRegion)cache.createVMRegion(prQName, ra, new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true).setSnapshotInputStream(null).setImageTarget(null));
                        if (logger.isDebugEnabled()) {
                            logger.debug("Region created  : {} partition Attributes : {}", (Object)prQ, (Object)prQ.getPartitionAttributes());
                        }
                        prQ.enableConflation(this.sender.isBatchConflationEnabled());
                        if (prQ.getLocalMaxMemory() != 0) {
                            Iterator<Integer> itr = prQ.getRegionAdvisor().getBucketSet().iterator();
                            while (itr.hasNext()) {
                                itr.next();
                            }
                        }
                    }
                    catch (IOException veryUnLikely) {
                        logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, this.getClass()), (Throwable)veryUnLikely);
                    }
                    catch (ClassNotFoundException alsoUnlikely) {
                        logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, this.getClass()), (Throwable)alsoUnlikely);
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: Created queue region: {}", (Object)this, (Object)prQ);
                    }
                } else if (this.index == 0) {
                    this.handleShadowPRExistsScenario(cache, prQ);
                }
                if (this.index == this.nDispatcher - 1 && this.sender.isRunning()) {
                    this.sender.enqueueTempEvents();
                }
                if (prQ == null) break block22;
                this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ);
            }
            catch (Throwable throwable) {
                if (prQ != null) {
                    this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ);
                }
                this.sender.getLifeCycleLock().writeLock().unlock();
                throw throwable;
            }
        }
        this.sender.getLifeCycleLock().writeLock().unlock();
    }

    private static String convertPathToName(String fullPath) {
        return "";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    public void addShadowPartitionedRegionForUserPR(PartitionedRegion userPR) {
        block31: {
            block32: {
                block33: {
                    block29: {
                        block30: {
                            block27: {
                                block28: {
                                    block25: {
                                        block26: {
                                            if (ParallelGatewaySenderQueue.logger.isDebugEnabled()) {
                                                ParallelGatewaySenderQueue.logger.debug("{} addShadowPartitionedRegionForUserPR: Attempting to create queue region: {}", (Object)this, (Object)userPR.getDisplayName());
                                            }
                                            this.sender.getLifeCycleLock().writeLock().lock();
                                            prQ = null;
                                            try {
                                                regionName = userPR.getFullPath();
                                                leaderRegionName = ColocationHelper.getLeaderRegion(userPR).getFullPath();
                                                if (regionName.equals(leaderRegionName)) break block25;
                                                if (!this.userRegionNameToshadowPRMap.containsKey(leaderRegionName)) {
                                                    this.addShadowPartitionedRegionForUserPR(ColocationHelper.getLeaderRegion(userPR));
                                                }
                                                if (prQ == null) break block26;
                                                this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
                                            }
                                            catch (Throwable var15_15) {
                                                if (prQ != null) {
                                                    this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
                                                }
                                                if (this.index == this.nDispatcher - 1 && this.sender.isRunning()) {
                                                    this.sender.enqueueTempEvents();
                                                }
                                                this.afterRegionAdd(userPR);
                                                this.sender.getLifeCycleLock().writeLock().unlock();
                                                throw var15_15;
                                            }
                                        }
                                        if (this.index == this.nDispatcher - 1 && this.sender.isRunning()) {
                                            this.sender.enqueueTempEvents();
                                        }
                                        this.afterRegionAdd(userPR);
                                        this.sender.getLifeCycleLock().writeLock().unlock();
                                        return;
                                    }
                                    if (!this.userRegionNameToshadowPRMap.containsKey(regionName)) break block27;
                                    if (prQ == null) break block28;
                                    this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
                                }
                                if (this.index == this.nDispatcher - 1 && this.sender.isRunning()) {
                                    this.sender.enqueueTempEvents();
                                }
                                this.afterRegionAdd(userPR);
                                this.sender.getLifeCycleLock().writeLock().unlock();
                                return;
                            }
                            if (userPR.getDataPolicy().withPersistence() && !this.sender.isPersistenceEnabled()) {
                                throw new GatewaySenderException(LocalizedStrings.ParallelGatewaySenderQueue_NON_PERSISTENT_GATEWAY_SENDER_0_CAN_NOT_BE_ATTACHED_TO_PERSISTENT_REGION_1.toLocalizedString(new Object[]{this.sender.getId(), userPR.getFullPath()}));
                            }
                            cache = (GemFireCacheImpl)this.sender.getCache();
                            isAccessor = userPR.getLocalMaxMemory() == 0;
                            prQName = this.sender.getId() + "_PARALLEL_GATEWAY_SENDER_QUEUE" + ParallelGatewaySenderQueue.convertPathToName(userPR.getFullPath());
                            prQ = (PartitionedRegion)cache.getRegion(prQName);
                            if (prQ != null) ** GOTO lbl103
                            fact = new AttributesFactory<K, V>();
                            fact.setConcurrencyChecksEnabled(false);
                            pfact = new PartitionAttributesFactory<K, V>();
                            pfact.setTotalNumBuckets(userPR.getTotalNumberOfBuckets());
                            pfact.setRedundantCopies(userPR.getRedundantCopies());
                            pfact.setColocatedWith(regionName);
                            localMaxMemory = isAccessor != false ? 0 : this.sender.getMaximumQueueMemory();
                            pfact.setLocalMaxMemory(localMaxMemory);
                            pfact.setStartupRecoveryDelay(userPR.getPartitionAttributes().getStartupRecoveryDelay());
                            pfact.setRecoveryDelay(userPR.getPartitionAttributes().getRecoveryDelay());
                            if (this.sender.isPersistenceEnabled() && !isAccessor) {
                                fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
                            }
                            fact.setDiskStoreName(this.sender.getDiskStoreName());
                            if (this.sender.isPersistenceEnabled()) {
                                fact.setDiskSynchronous(this.sender.isDiskSynchronous());
                            } else {
                                fact.setDiskSynchronous(false);
                            }
                            ea = EvictionAttributes.createLIFOMemoryAttributes(this.sender.getMaximumQueueMemory(), EvictionAction.OVERFLOW_TO_DISK);
                            fact.setEvictionAttributes(ea);
                            fact.setPartitionAttributes(pfact.create());
                            ra = fact.create();
                            if (ParallelGatewaySenderQueue.logger.isDebugEnabled()) {
                                ParallelGatewaySenderQueue.logger.debug("{}: Attempting to create queue region: {}", (Object)this, (Object)prQName);
                            }
                            meta = this.metaRegionFactory.newMetataRegion(cache, prQName, ra, this.sender);
                            prQ = (PartitionedRegion)cache.createVMRegion(prQName, ra, new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true).setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null));
                            prQ.enableConflation(this.sender.isBatchConflationEnabled());
                            if (!isAccessor) break block29;
                            if (prQ == null) break block30;
                            this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
                        }
                        if (this.index == this.nDispatcher - 1 && this.sender.isRunning()) {
                            this.sender.enqueueTempEvents();
                        }
                        this.afterRegionAdd(userPR);
                        this.sender.getLifeCycleLock().writeLock().unlock();
                        return;
                    }
                    prQ.shadowPRWaitForBucketRecovery();
                    {
                        catch (IOException | ClassNotFoundException veryUnLikely) {
                            ParallelGatewaySenderQueue.logger.fatal((Message)LocalizedMessage.create(LocalizedStrings.SingleWriteSingleReadRegionQueue_UNEXPECTED_EXCEPTION_DURING_INIT_OF_0, this.getClass()), (Throwable)veryUnLikely);
                        }
                    }
                    if (ParallelGatewaySenderQueue.logger.isDebugEnabled()) {
                        ParallelGatewaySenderQueue.logger.debug("{}: Created queue region: {}", (Object)this, (Object)prQ);
                    }
                    break block31;
lbl103:
                    // 1 sources

                    if (!isAccessor) break block32;
                    if (prQ == null) break block33;
                    this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
                }
                if (this.index == this.nDispatcher - 1 && this.sender.isRunning()) {
                    this.sender.enqueueTempEvents();
                }
                this.afterRegionAdd(userPR);
                this.sender.getLifeCycleLock().writeLock().unlock();
                return;
            }
            if (this.index != 0) break block31;
            this.handleShadowPRExistsScenario(cache, prQ);
        }
        if (prQ != null) {
            this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
        }
        if (this.index == this.nDispatcher - 1 && this.sender.isRunning()) {
            this.sender.enqueueTempEvents();
        }
        this.afterRegionAdd(userPR);
        this.sender.getLifeCycleLock().writeLock().unlock();
    }

    private void handleShadowPRExistsScenario(Cache cache, PartitionedRegion prQ) {
        if (logger.isDebugEnabled()) {
            logger.debug("{}: No need to create the region as the region has been retrieved: {}", (Object)this, (Object)prQ);
        }
        Set<BucketRegion> localBucketRegions = prQ.getDataStore().getAllLocalBucketRegions();
        for (BucketRegion bucketRegion : localBucketRegions) {
            bucketRegion.clear();
        }
    }

    protected void afterRegionAdd(PartitionedRegion userPR) {
    }

    private void initializeConflationThreadPool() {
        final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup.createThreadGroup("WAN Queue Conflation Logger Group", logger);
        ThreadFactory threadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable task) {
                Thread thread = new Thread(loggingThreadGroup, task, "WAN Queue Conflation Thread");
                thread.setDaemon(true);
                return thread;
            }
        };
        this.conflationExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
    }

    private void cleanupConflationThreadPool(AbstractGatewaySender sender) {
        if (this.conflationExecutor == null) {
            return;
        }
        this.conflationExecutor.shutdown();
        try {
            if (!this.conflationExecutor.awaitTermination(1L, TimeUnit.SECONDS)) {
                this.conflationExecutor.shutdownNow();
                if (!this.conflationExecutor.awaitTermination(1L, TimeUnit.SECONDS)) {
                    logger.warn((Message)LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderQueue_COULD_NOT_TERMINATE_CONFLATION_THREADPOOL, sender == null ? "all" : sender));
                }
            }
        }
        catch (InterruptedException e) {
            this.conflationExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean put(Object object) throws InterruptedException, CacheException {
        boolean putDone;
        block39: {
            boolean isDebugEnabled = logger.isDebugEnabled();
            putDone = false;
            GatewaySenderEventImpl value = (GatewaySenderEventImpl)object;
            boolean isDREvent = this.isDREvent(value);
            Region<?, ?> region = value.getRegion();
            String regionPath = null;
            regionPath = isDREvent ? region.getFullPath() : ColocationHelper.getLeaderRegion((PartitionedRegion)region).getFullPath();
            if (isDebugEnabled) {
                logger.debug("Put is for the region {}", region);
            }
            if (!this.userRegionNameToshadowPRMap.containsKey(regionPath)) {
                if (isDebugEnabled) {
                    logger.debug("The userRegionNameToshadowPRMap is {}", this.userRegionNameToshadowPRMap);
                }
                logger.warn((Message)LocalizedMessage.create(LocalizedStrings.NOT_QUEUING_AS_USERPR_IS_NOT_YET_CONFIGURED, value));
                return false;
            }
            PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(regionPath);
            int bucketId = value.getBucketId();
            Serializable key = null;
            if (!isDREvent) {
                key = value.getShadowKey();
                if ((Long)key == -1L) {
                    if (isDebugEnabled) {
                        logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {}", (Object)key, (Object)value);
                    }
                    return false;
                }
            } else {
                key = value.getEventId();
            }
            if (isDebugEnabled) {
                logger.debug("ParallelGatewaySenderOrderedQueue putting key {} : Value : {}", (Object)key, (Object)value);
            }
            AbstractBucketRegionQueue brq = (AbstractBucketRegionQueue)prQ.getDataStore().getLocalBucketById(bucketId);
            try {
                if (brq == null) {
                    int oldLevel = LocalRegion.setThreadInitLevelRequirement(1);
                    try {
                        String bucketFullPath = "/__PR/" + prQ.getBucketName(bucketId);
                        brq = (AbstractBucketRegionQueue)prQ.getCache().getRegionByPath(bucketFullPath);
                        if (isDebugEnabled) {
                            logger.debug("ParallelGatewaySenderOrderedQueue : The bucket in the cache is bucketRegionName : {} bucket : {}", (Object)bucketFullPath, (Object)brq);
                        }
                        if (brq != null) {
                            brq.getInitializationLock().readLock().lock();
                            try {
                                this.putIntoBucketRegionQueue(brq, key, value);
                                putDone = true;
                                break block39;
                            }
                            finally {
                                brq.getInitializationLock().readLock().unlock();
                            }
                        }
                        if (isDREvent) {
                            break block39;
                        }
                        if (prQ.getColocatedWithRegion().getRegionAdvisor().getBucketAdvisor(bucketId).getShadowBucketDestroyed()) {
                            if (isDebugEnabled) {
                                logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.", (Object)key, (Object)value);
                            }
                            break block39;
                        }
                        LinkedBlockingQueue<GatewaySenderEventImpl> tempQueue = null;
                        Object object2 = this.bucketToTempQueueMap;
                        synchronized (object2) {
                            tempQueue = (LinkedBlockingQueue<GatewaySenderEventImpl>)this.bucketToTempQueueMap.get(bucketId);
                            if (tempQueue == null) {
                                tempQueue = new LinkedBlockingQueue<GatewaySenderEventImpl>();
                                this.bucketToTempQueueMap.put(bucketId, tempQueue);
                            }
                        }
                        object2 = tempQueue;
                        synchronized (object2) {
                            brq = (AbstractBucketRegionQueue)prQ.getCache().getRegionByPath(bucketFullPath);
                            if (brq != null) {
                                brq.getInitializationLock().readLock().lock();
                                try {
                                    this.putIntoBucketRegionQueue(brq, key, value);
                                    putDone = true;
                                }
                                finally {
                                    brq.getInitializationLock().readLock().unlock();
                                }
                            } else {
                                tempQueue.add(value);
                                putDone = true;
                                if (isDebugEnabled) {
                                    logger.debug("The value {} is enqueued to the tempQueue for the BucketRegionQueue.", (Object)value);
                                }
                            }
                            break block39;
                        }
                    }
                    finally {
                        LocalRegion.setThreadInitLevelRequirement(oldLevel);
                    }
                }
                boolean thisbucketDestroyed = false;
                thisbucketDestroyed = !isDREvent ? prQ.getColocatedWithRegion().getRegionAdvisor().getBucketAdvisor(bucketId).getShadowBucketDestroyed() || brq.isDestroyed() : brq.isDestroyed();
                if (!thisbucketDestroyed) {
                    this.putIntoBucketRegionQueue(brq, key, value);
                    putDone = true;
                } else if (isDebugEnabled) {
                    logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.", (Object)key, (Object)value);
                }
            }
            finally {
                this.notifyEventProcessorIfRequired();
            }
        }
        return putDone;
    }

    public void notifyEventProcessorIfRequired() {
        if (this.isQueueEmpty) {
            this.queueEmptyLock.lock();
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Going to notify, isQueueEmpty {}", (Object)this.isQueueEmpty);
                }
                if (this.isQueueEmpty) {
                    this.isQueueEmpty = false;
                    this.queueEmptyCondition.signal();
                }
            }
            finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Notified!, isQueueEmpty {}", (Object)this.isQueueEmpty);
                }
                this.queueEmptyLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putIntoBucketRegionQueue(AbstractBucketRegionQueue brq, Object key, GatewaySenderEventImpl value) {
        boolean addedValueToQueue = false;
        try {
            if (brq != null) {
                addedValueToQueue = brq.addToQueue(key, value);
            }
        }
        catch (BucketNotFoundException e) {
            if (logger.isDebugEnabled()) {
                logger.debug("For bucket {} the current bucket redundancy is {}", (Object)brq.getId(), (Object)brq.getPartitionedRegion().getRegionAdvisor().getBucketAdvisor(brq.getId()).getBucketRedundancy());
            }
        }
        catch (ForceReattemptException e) {
            if (logger.isDebugEnabled()) {
                logger.debug("getInitializedBucketForId: Got ForceReattemptException for {} for bucket = {}", (Object)this, (Object)brq.getId());
            }
        }
        finally {
            if (!addedValueToQueue) {
                value.release();
            }
        }
    }

    @Override
    public Region getRegion() {
        return this.userRegionNameToshadowPRMap.size() == 1 ? (Region)this.userRegionNameToshadowPRMap.values().toArray()[0] : null;
    }

    public PartitionedRegion getRegion(String fullpath) {
        return this.userRegionNameToshadowPRMap.get(fullpath);
    }

    public PartitionedRegion removeShadowPR(String fullpath) {
        try {
            this.sender.getLifeCycleLock().writeLock().lock();
            this.sender.setEnqueuedAllTempQueueEvents(false);
            PartitionedRegion partitionedRegion = this.userRegionNameToshadowPRMap.remove(fullpath);
            return partitionedRegion;
        }
        finally {
            this.sender.getLifeCycleLock().writeLock().unlock();
        }
    }

    public ExecutorService getConflationExecutor() {
        return this.conflationExecutor;
    }

    public Set<PartitionedRegion> getRegions() {
        return new HashSet<PartitionedRegion>(this.userRegionNameToshadowPRMap.values());
    }

    protected PartitionedRegion getRandomShadowPR() {
        PartitionedRegion prQ = null;
        if (this.userRegionNameToshadowPRMap.values().size() > 0) {
            int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size());
            prQ = (PartitionedRegion)this.userRegionNameToshadowPRMap.values().toArray()[randomIndex];
        }
        return prQ;
    }

    private boolean isDREvent(GatewaySenderEventImpl event) {
        return event.getRegion() instanceof DistributedRegion;
    }

    @Override
    public Object take() throws CacheException, InterruptedException {
        throw new UnsupportedOperationException();
    }

    private final BucketRegionQueue getRandomBucketRegionQueue() {
        PartitionedRegion prQ = this.getRandomShadowPR();
        if (prQ != null) {
            PartitionedRegionDataStore ds = prQ.getDataStore();
            ArrayList<Integer> buckets = new ArrayList<Integer>(ds.getAllLocalPrimaryBucketIds());
            if (buckets.isEmpty()) {
                return null;
            }
            int index = new Random().nextInt(buckets.size());
            int brqId = (Integer)buckets.get(index);
            BucketRegionQueue brq = (BucketRegionQueue)ds.getLocalBucketById(brqId);
            if (brq.isReadyForPeek()) {
                return brq;
            }
        }
        return null;
    }

    protected boolean areLocalBucketQueueRegionsPresent() {
        boolean bucketsAvailable = false;
        for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
            if (prQ.getDataStore().getAllLocalBucketRegions().size() <= 0) continue;
            return true;
        }
        return false;
    }

    protected int getRandomPrimaryBucket(PartitionedRegion prQ) {
        if (prQ != null) {
            Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets();
            ArrayList<Integer> thisProcessorBuckets = new ArrayList<Integer>();
            for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) {
                int bId;
                BucketRegion bucket = bucketEntry.getValue();
                if (!bucket.getBucketAdvisor().isPrimary() || (bId = bucket.getId()) % this.nDispatcher != this.index) continue;
                thisProcessorBuckets.add(bId);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("getRandomPrimaryBucket: total {} for this processor: {}", (Object)allBuckets.size(), (Object)thisProcessorBuckets.size());
            }
            int nTry = thisProcessorBuckets.size();
            while (nTry-- > 0) {
                BucketRegionQueue br;
                if (this.pickBucketId >= thisProcessorBuckets.size()) {
                    this.pickBucketId = 0;
                }
                if ((br = this.getBucketRegionQueueByBucketId(prQ, (Integer)thisProcessorBuckets.get(this.pickBucketId++))) == null || !br.isReadyForPeek()) continue;
                return br.getId();
            }
        }
        return -1;
    }

    @Override
    public List take(int batchSize) throws CacheException, InterruptedException {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void remove() throws CacheException {
        if (!this.peekedEvents.isEmpty()) {
            GatewaySenderEventImpl event = (GatewaySenderEventImpl)this.peekedEvents.remove();
            try {
                PartitionedRegion prQ = null;
                int bucketId = -1;
                Serializable key = null;
                if (event.getRegion() != null) {
                    if (this.isDREvent(event)) {
                        prQ = this.userRegionNameToshadowPRMap.get(event.getRegion().getFullPath());
                        bucketId = event.getEventId().getBucketID();
                        key = event.getEventId();
                    } else {
                        prQ = this.userRegionNameToshadowPRMap.get(ColocationHelper.getLeaderRegion((PartitionedRegion)event.getRegion()).getFullPath());
                        bucketId = event.getBucketId();
                        key = event.getShadowKey();
                    }
                } else {
                    String regionPath = event.getRegionPath();
                    GemFireCacheImpl cache = (GemFireCacheImpl)this.sender.getCache();
                    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionPath);
                    if (region != null && !region.isDestroyed()) {
                        if (region instanceof DistributedRegion) {
                            prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath());
                            event.getBucketId();
                            key = event.getEventId();
                        } else {
                            prQ = this.userRegionNameToshadowPRMap.get(ColocationHelper.getLeaderRegion(region).getFullPath());
                            event.getBucketId();
                            key = event.getShadowKey();
                        }
                    }
                }
                if (prQ != null) {
                    this.destroyEventFromQueue(prQ, bucketId, key);
                }
            }
            finally {
                event.release();
            }
        }
    }

    private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) {
        boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
        if (isPrimary) {
            block10: {
                BucketRegionQueue brq = this.getBucketRegionQueueByBucketId(prQ, bucketId);
                try {
                    if (brq != null) {
                        brq.destroyKey(key);
                    }
                    this.stats.decQueueSize();
                }
                catch (EntryNotFoundException e) {
                    if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) {
                        logger.debug("ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}", key, (Object)this, (Object)bucketId, (Object)this.sender);
                    }
                }
                catch (ForceReattemptException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Bucket :{} moved to other member", (Object)bucketId);
                    }
                }
                catch (PrimaryBucketException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Primary bucket :{} moved to other member", (Object)bucketId);
                    }
                }
                catch (RegionDestroyedException e) {
                    if (!logger.isDebugEnabled()) break block10;
                    logger.debug("Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", key, (Object)bucketId, (Object)prQ.getFullPath());
                }
            }
            this.addRemovedEvent(prQ, bucketId, key);
        }
    }

    public void resetLastPeeked() {
        this.resetLastPeeked = true;
        this.peekedEventsProcessingInProgress = false;
        this.peekedEventsProcessing.clear();
    }

    @Override
    public Object peek() throws InterruptedException, CacheException {
        Object object;
        block4: {
            object = null;
            int bucketId = -1;
            PartitionedRegion prQ = this.getRandomShadowPR();
            if (prQ != null && prQ.getDataStore().getAllLocalBucketRegions().size() > 0 && (bucketId = this.getRandomPrimaryBucket(prQ)) != -1) {
                try {
                    BucketRegionQueue brq = (BucketRegionQueue)prQ.getDataStore().getInitializedBucketForId(null, bucketId);
                    object = brq.peek();
                }
                catch (BucketRegionQueueUnavailableException e) {
                    return object;
                }
                catch (ForceReattemptException e) {
                    if (!logger.isDebugEnabled()) break block4;
                    logger.debug("Remove: Got ForceReattemptException for {} for bucke = {}", (Object)this, (Object)bucketId);
                }
            }
        }
        return object;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
        StoppableReentrantLock lock = this.buckToDispatchLock;
        if (lock != null) {
            lock.lock();
            boolean wasEmpty = this.regionToDispatchedKeysMap.isEmpty();
            try {
                ConcurrentHashMap bucketIdToDispatchedKeys = (ConcurrentHashMap)this.regionToDispatchedKeysMap.get(prQ.getFullPath());
                if (bucketIdToDispatchedKeys == null) {
                    bucketIdToDispatchedKeys = new ConcurrentHashMap();
                    this.regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
                }
                this.addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
                if (wasEmpty) {
                    this.regionToDispatchedKeysMapEmpty.signal();
                }
            }
            finally {
                lock.unlock();
            }
        }
    }

    private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int bucketId, Object key) {
        ArrayList<Object> dispatchedKeys = (ArrayList<Object>)bucketIdToDispatchedKeys.get(bucketId);
        if (dispatchedKeys == null) {
            dispatchedKeys = new ArrayList<Object>();
            bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys);
        }
        dispatchedKeys.add(key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addRemovedEvents(PartitionedRegion prQ, int bucketId, List<Object> shadowKeys) {
        this.buckToDispatchLock.lock();
        boolean wasEmpty = this.regionToDispatchedKeysMap.isEmpty();
        try {
            ConcurrentHashMap bucketIdToDispatchedKeys = (ConcurrentHashMap)this.regionToDispatchedKeysMap.get(prQ.getFullPath());
            if (bucketIdToDispatchedKeys == null) {
                bucketIdToDispatchedKeys = new ConcurrentHashMap();
                this.regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
            }
            this.addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys);
            if (wasEmpty) {
                this.regionToDispatchedKeysMapEmpty.signal();
            }
        }
        finally {
            this.buckToDispatchLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addRemovedEvents(String prQPath, int bucketId, List<Object> shadowKeys) {
        this.buckToDispatchLock.lock();
        boolean wasEmpty = this.regionToDispatchedKeysMap.isEmpty();
        try {
            ConcurrentHashMap bucketIdToDispatchedKeys = (ConcurrentHashMap)this.regionToDispatchedKeysMap.get(prQPath);
            if (bucketIdToDispatchedKeys == null) {
                bucketIdToDispatchedKeys = new ConcurrentHashMap();
                this.regionToDispatchedKeysMap.put(prQPath, bucketIdToDispatchedKeys);
            }
            this.addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys);
            if (wasEmpty) {
                this.regionToDispatchedKeysMapEmpty.signal();
            }
        }
        finally {
            this.buckToDispatchLock.unlock();
        }
    }

    private void addRemovedEventsToMap(Map bucketIdToDispatchedKeys, int bucketId, List keys) {
        List dispatchedKeys = (List)bucketIdToDispatchedKeys.get(bucketId);
        if (dispatchedKeys == null) {
            dispatchedKeys = keys == null ? new ArrayList() : keys;
        } else {
            dispatchedKeys.addAll(keys);
        }
        bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys);
    }

    @Override
    public List peek(int batchSize) throws InterruptedException, CacheException {
        throw new UnsupportedOperationException();
    }

    @Override
    public List peek(int batchSize, int timeToWait) throws InterruptedException, CacheException {
        boolean isDebugEnabled = logger.isDebugEnabled();
        PartitionedRegion prQ = this.getRandomShadowPR();
        ArrayList<GatewaySenderEventImpl> batch = new ArrayList<GatewaySenderEventImpl>();
        if (prQ == null || prQ.getLocalMaxMemory() == 0) {
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.blockProcesorThreadIfRequired();
            return batch;
        }
        long start = System.currentTimeMillis();
        long end = start + (long)timeToWait;
        this.addPeekedEvents(batch, batchSize);
        int bId = -1;
        while (batch.size() < batchSize) {
            if (this.areLocalBucketQueueRegionsPresent() && (bId = this.getRandomPrimaryBucket(prQ)) != -1) {
                GatewaySenderEventImpl object = (GatewaySenderEventImpl)this.peekAhead(prQ, bId);
                if (object != null) {
                    GatewaySenderEventImpl copy = object.makeHeapCopyIfOffHeap();
                    if (copy == null) {
                        if (this.stats == null) continue;
                        this.stats.incEventsNotQueuedConflated();
                        continue;
                    }
                    object = copy;
                }
                if (object != null) {
                    if (isDebugEnabled) {
                        logger.debug("The gatewayEventImpl in peek is {}", (Object)object);
                    }
                    batch.add(object);
                    this.peekedEvents.add(object);
                    continue;
                }
                long currentTime = System.currentTimeMillis();
                if (isDebugEnabled) {
                    logger.debug("{}: Peeked object was null. Peek current time: {}", (Object)this, (Object)currentTime);
                }
                if (timeToWait == -1 || end <= currentTime) {
                    if (!isDebugEnabled) break;
                    logger.debug("{}: Peeked object was null.. Peek breaking", (Object)this);
                    break;
                }
                if (!isDebugEnabled) continue;
                logger.debug("{}: Peeked object was null. Peek continuing", (Object)this);
                continue;
            }
            long currentTime = System.currentTimeMillis();
            if (isDebugEnabled) {
                logger.debug("{}: Peek current time: {}", (Object)this, (Object)currentTime);
            }
            if (timeToWait == -1 || end <= currentTime) {
                if (!isDebugEnabled) break;
                logger.debug("{}: Peek breaking", (Object)this);
                break;
            }
            if (isDebugEnabled) {
                logger.debug("{}: Peek continuing", (Object)this);
            }
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        if (isDebugEnabled) {
            logger.debug("{}: Peeked a batch of {} entries. The size of the queue is {}. localSize is {}", (Object)this, (Object)batch.size(), (Object)this.size(), (Object)this.localSize());
        }
        if (batch.size() == 0) {
            this.blockProcesorThreadIfRequired();
        }
        return batch;
    }

    private void addPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize) {
        if (this.resetLastPeeked) {
            Iterator iterator = this.peekedEvents.iterator();
            while (iterator.hasNext()) {
                GatewaySenderEventImpl event = (GatewaySenderEventImpl)iterator.next();
                int bucketId = event.getBucketId();
                PartitionedRegion region = (PartitionedRegion)event.getRegion();
                if (region.getRegionAdvisor().isPrimaryForBucket(bucketId)) continue;
                iterator.remove();
            }
            if (this.peekedEventsProcessingInProgress) {
                this.addPreviouslyPeekedEvents(batch, batchSize);
            } else if (this.peekedEvents.size() <= batchSize) {
                batch.addAll(this.peekedEvents);
                this.resetLastPeeked = false;
            } else {
                this.peekedEventsProcessing.addAll(this.peekedEvents);
                this.peekedEventsProcessingInProgress = true;
                this.addPreviouslyPeekedEvents(batch, batchSize);
            }
            if (logger.isDebugEnabled()) {
                StringBuffer buffer = new StringBuffer();
                for (GatewaySenderEventImpl ge : batch) {
                    buffer.append("event :");
                    buffer.append(ge);
                }
                logger.debug("Adding already peeked events to the batch {}", (Object)buffer);
            }
        }
    }

    private void addPreviouslyPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize) {
        for (int i = 0; i < batchSize; ++i) {
            batch.add((GatewaySenderEventImpl)this.peekedEventsProcessing.remove());
            if (!this.peekedEventsProcessing.isEmpty()) continue;
            this.resetLastPeeked = false;
            this.peekedEventsProcessingInProgress = false;
            break;
        }
    }

    protected void blockProcesorThreadIfRequired() throws InterruptedException {
        this.queueEmptyLock.lock();
        try {
            if (this.isQueueEmpty) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Going to wait, till notified.");
                }
                this.queueEmptyCondition.await(1000L);
            }
            this.isQueueEmpty = this.localSizeForProcessor() == 0;
        }
        finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Going to unblock. isQueueEmpty {}", (Object)this.isQueueEmpty);
            }
            this.queueEmptyLock.unlock();
        }
    }

    protected Object peekAhead(PartitionedRegion prQ, int bucketId) throws CacheException {
        Object object = null;
        BucketRegionQueue brq = this.getBucketRegionQueueByBucketId(prQ, bucketId);
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Peekahead for the bucket {}", (Object)this, (Object)bucketId);
        }
        try {
            object = brq.peek();
        }
        catch (BucketRegionQueueUnavailableException e) {
            return object;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Peeked object from bucket {} object: {}", (Object)this, (Object)bucketId, object);
        }
        if (object == null && this.stats != null) {
            this.stats.incEventsNotQueuedConflated();
        }
        return object;
    }

    protected BucketRegionQueue getBucketRegionQueueByBucketId(PartitionedRegion prQ, int bucketId) {
        return (BucketRegionQueue)prQ.getDataStore().getLocalBucketById(bucketId);
    }

    public int localSize() {
        int size = 0;
        for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
            if (prQ != null && prQ.getDataStore() != null) {
                size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
            }
            if (!logger.isDebugEnabled()) continue;
            logger.debug("The name of the queue region is {} and the size is {}", (Object)prQ.getFullPath(), (Object)size);
        }
        return size;
    }

    public int localSizeForProcessor() {
        int size = 0;
        for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
            if (((PartitionedRegion)prQ.getRegion()).getDataStore() != null) {
                Set<BucketRegion> primaryBuckets = ((PartitionedRegion)prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions();
                for (BucketRegion br : primaryBuckets) {
                    if (br.getId() % this.nDispatcher != this.index) continue;
                    size += br.size();
                }
            }
            if (!logger.isDebugEnabled()) continue;
            logger.debug("The name of the queue region is {} and the size is {}", (Object)prQ.getFullPath(), (Object)size);
        }
        return size;
    }

    @Override
    public int size() {
        int size = 0;
        for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
            if (logger.isDebugEnabled()) {
                logger.debug("The name of the queue region is {} and the size is {}. keyset size is {}", (Object)prQ.getName(), (Object)prQ.size(), (Object)prQ.keys().size());
            }
            size += prQ.size();
        }
        return size + this.sender.getTmpQueuedEventSize();
    }

    @Override
    public void addCacheListener(CacheListener listener) {
        for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
            AttributesMutator mutator = prQ.getAttributesMutator();
            mutator.addCacheListener(listener);
        }
    }

    @Override
    public void removeCacheListener() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void remove(int batchSize) throws CacheException {
        for (int i = 0; i < batchSize; ++i) {
            this.remove();
        }
    }

    public void conflateEvent(Conflatable conflatableObject, int bucketId, Long tailKey) {
        ConflationHandler conflationHandler = new ConflationHandler(conflatableObject, bucketId, tailKey);
        this.conflationExecutor.execute(conflationHandler);
    }

    public long getNumEntriesOverflowOnDiskTestOnly() {
        long numEntriesOnDisk = 0L;
        for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
            DiskRegionStats diskStats = prQ.getDiskRegionStats();
            if (diskStats == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: DiskRegionStats for shadow PR is null. Returning the numEntriesOverflowOnDisk as 0", (Object)this);
                }
                return 0L;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesOverflowOnDisk obtained from DiskRegionStats", (Object)this);
            }
            numEntriesOnDisk += diskStats.getNumOverflowOnDisk();
        }
        return numEntriesOnDisk;
    }

    public long getNumEntriesInVMTestOnly() {
        long numEntriesInVM = 0L;
        for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
            DiskRegionStats diskStats = prQ.getDiskRegionStats();
            if (diskStats == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: DiskRegionStats for shadow PR is null. Returning the numEntriesInVM as 0", (Object)this);
                }
                return 0L;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{}: DiskRegionStats for shadow PR is NOT null. Returning the numEntriesInVM obtained from DiskRegionStats", (Object)this);
            }
            numEntriesInVM += diskStats.getNumEntriesInVM();
        }
        return numEntriesInVM;
    }

    public void cleanUp() {
        this.regionToDispatchedKeysMap.clear();
        this.removalThread.shutdown();
        this.cleanupConflationThreadPool(this.sender);
    }

    @Override
    public void close() {
    }

    public Map<Integer, BlockingQueue<GatewaySenderEventImpl>> getBucketToTempQueueMap() {
        return this.bucketToTempQueueMap;
    }

    public static boolean isParallelQueue(String regionName) {
        return regionName.contains(QSTRING);
    }

    public static String getQueueName(String senderId, String regionPath) {
        return senderId + QSTRING + ParallelGatewaySenderQueue.convertPathToName(regionPath);
    }

    public static String getSenderId(String regionName) {
        int queueStringStart = regionName.indexOf(QSTRING);
        return regionName.substring(1, queueStringStart);
    }

    public long estimateMemoryFootprint(SingleObjectSizer sizer) {
        return sizer.sizeof(this) + sizer.sizeof(this.regionToDispatchedKeysMap) + sizer.sizeof(this.userRegionNameToshadowPRMap) + sizer.sizeof(this.bucketToTempQueueMap) + sizer.sizeof(this.peekedEvents) + sizer.sizeof(this.conflationExecutor);
    }

    public void clear(PartitionedRegion pr, int bucketId) {
        throw new RuntimeException("This method(clear)is not supported by ParallelGatewaySenderQueue");
    }

    public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException {
        throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue");
    }

    static class MetaRegionFactory {
        MetaRegionFactory() {
        }

        ParallelGatewaySenderQueueMetaRegion newMetataRegion(GemFireCacheImpl cache, String prQName, RegionAttributes ra, AbstractGatewaySender sender) {
            ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender);
            return meta;
        }
    }

    protected static class ParallelGatewaySenderQueueMetaRegion
    extends PartitionedRegion {
        AbstractGatewaySender sender = null;

        public ParallelGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender pgSender) {
            super(regionName, attrs, parentRegion, cache, new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false).setSnapshotInputStream(null).setImageTarget(null).setIsUsedForParallelGatewaySenderQueue(true).setParallelGatewaySender(pgSender));
            this.sender = pgSender;
        }

        @Override
        protected boolean isCopyOnRead() {
            return false;
        }

        @Override
        public final boolean isSecret() {
            return true;
        }

        @Override
        public final boolean supportsConcurrencyChecks() {
            return false;
        }

        @Override
        protected final boolean shouldNotifyBridgeClients() {
            return false;
        }

        @Override
        public final boolean generateEventID() {
            return false;
        }

        @Override
        public final boolean isUsedForParallelGatewaySenderQueue() {
            return true;
        }

        @Override
        public final AbstractGatewaySender getParallelGatewaySender() {
            return this.sender;
        }
    }

    private class BatchRemovalThread
    extends Thread {
        private volatile boolean shutdown;
        private final GemFireCacheImpl cache;
        private final ParallelGatewaySenderQueue parallelQueue;

        public BatchRemovalThread(GemFireCacheImpl c, ParallelGatewaySenderQueue queue) {
            super("BatchRemovalThread");
            this.shutdown = false;
            this.setDaemon(true);
            this.cache = c;
            this.parallelQueue = queue;
        }

        private boolean checkCancelled() {
            if (this.shutdown) {
                return true;
            }
            return this.cache.getCancelCriterion().isCancelInProgress();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block35: {
                try {
                    InternalDistributedSystem ids = this.cache.getDistributedSystem();
                    DM dm = ids.getDistributionManager();
                    while (true) {
                        try {
                            while (true) {
                                if (this.checkCancelled()) {
                                    break block35;
                                }
                                boolean interrupted = Thread.interrupted();
                                try {
                                    BatchRemovalThread batchRemovalThread = this;
                                    synchronized (batchRemovalThread) {
                                        this.wait(messageSyncInterval);
                                    }
                                }
                                catch (InterruptedException e) {
                                    interrupted = true;
                                    if (this.checkCancelled()) {
                                        // empty if block
                                    }
                                    break block35;
                                }
                                finally {
                                    if (interrupted) {
                                        Thread.currentThread().interrupt();
                                    }
                                }
                                if (logger.isDebugEnabled()) {
                                    ParallelGatewaySenderQueue.this.buckToDispatchLock.lock();
                                    try {
                                        logger.debug("BatchRemovalThread about to query the batch removal map {}", (Object)ParallelGatewaySenderQueue.this.regionToDispatchedKeysMap);
                                    }
                                    finally {
                                        ParallelGatewaySenderQueue.this.buckToDispatchLock.unlock();
                                    }
                                }
                                HashMap temp = new HashMap();
                                ParallelGatewaySenderQueue.this.buckToDispatchLock.lock();
                                try {
                                    boolean wasEmpty = ParallelGatewaySenderQueue.this.regionToDispatchedKeysMap.isEmpty();
                                    while (ParallelGatewaySenderQueue.this.regionToDispatchedKeysMap.isEmpty()) {
                                        ParallelGatewaySenderQueue.this.regionToDispatchedKeysMapEmpty.await(15000L);
                                    }
                                    if (wasEmpty) continue;
                                    temp.putAll(ParallelGatewaySenderQueue.this.regionToDispatchedKeysMap);
                                    ParallelGatewaySenderQueue.this.regionToDispatchedKeysMap.clear();
                                }
                                finally {
                                    ParallelGatewaySenderQueue.this.buckToDispatchLock.unlock();
                                    continue;
                                }
                                Set<InternalDistributedMember> recipients = this.getAllRecipients(this.cache, temp);
                                if (recipients.isEmpty()) continue;
                                ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
                                pqrm.setRecipients(recipients);
                                dm.putOutgoing(pqrm);
                            }
                        }
                        catch (CancelException e) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("BatchRemovalThread is exiting due to cancellation");
                            }
                        }
                        catch (VirtualMachineError err) {
                            SystemFailure.initiateFailure(err);
                            throw err;
                        }
                        catch (Throwable t) {
                            Error err;
                            if (t instanceof Error && SystemFailure.isJVMFailureError(err = (Error)t)) {
                                SystemFailure.initiateFailure(err);
                                throw err;
                            }
                            SystemFailure.checkFailure();
                            if (this.checkCancelled()) {
                                break;
                            }
                            if (!logger.isDebugEnabled()) continue;
                            logger.debug("BatchRemovalThread: ignoring exception", t);
                            continue;
                        }
                        break;
                    }
                }
                catch (CancelException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("BatchRemovalThread exiting due to cancellation: " + e);
                    }
                }
                finally {
                    logger.info((Message)LocalizedMessage.create(LocalizedStrings.HARegionQueue_THE_QUEUEREMOVALTHREAD_IS_DONE));
                }
            }
        }

        private Set<InternalDistributedMember> getAllRecipients(GemFireCacheImpl cache, Map map) {
            ObjectOpenHashSet recipients = new ObjectOpenHashSet();
            for (Object pr : map.keySet()) {
                recipients.addAll(((PartitionedRegion)cache.getRegion((String)pr)).getRegionAdvisor().adviseDataStore());
            }
            return recipients;
        }

        public void shutdown() {
            this.shutdown = true;
            this.interrupt();
            boolean interrupted = Thread.interrupted();
            try {
                this.join(15000L);
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
            finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
            if (this.isAlive()) {
                logger.warn((Message)LocalizedMessage.create(LocalizedStrings.HARegionQueue_QUEUEREMOVALTHREAD_IGNORED_CANCELLATION));
            }
        }
    }

    private class ConflationHandler
    implements Runnable {
        Conflatable conflatableObject;
        Long previousTailKeyTobeRemoved;
        int bucketId;

        public ConflationHandler(Conflatable conflatableObject, int bId, Long previousTailKey) {
            this.conflatableObject = conflatableObject;
            this.previousTailKeyTobeRemoved = previousTailKey;
            this.bucketId = bId;
        }

        @Override
        public void run() {
            PartitionedRegion prQ;
            block3: {
                prQ = null;
                GatewaySenderEventImpl event = (GatewaySenderEventImpl)this.conflatableObject;
                try {
                    String regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion)event.getRegion()).getFullPath();
                    prQ = ParallelGatewaySenderQueue.this.userRegionNameToshadowPRMap.get(regionPath);
                    ParallelGatewaySenderQueue.this.destroyEventFromQueue(prQ, this.bucketId, this.previousTailKeyTobeRemoved);
                }
                catch (EntryNotFoundException e) {
                    if (!logger.isDebugEnabled()) break block3;
                    logger.debug("{}: Not conflating {} due to EntryNotFoundException", (Object)this, this.conflatableObject.getKeyToConflate());
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Conflated {} for key={} in queue for region={}", (Object)this, this.conflatableObject.getValueToConflate(), this.conflatableObject.getKeyToConflate(), (Object)prQ.getName());
            }
        }

        private Object deserialize(Object serializedBytes) {
            Object deserializedObject = serializedBytes;
            if (serializedBytes instanceof byte[]) {
                byte[] serializedBytesCast = (byte[])serializedBytes;
                try {
                    deserializedObject = EntryEventImpl.deserialize(serializedBytesCast);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            return deserializedObject;
        }
    }
}

