/*
 * Decompiled with CFR 0.152.
 */
package org.compass.core.lucene.engine.transaction.support;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.compass.core.Resource;
import org.compass.core.engine.SearchEngineException;
import org.compass.core.lucene.engine.LuceneSearchEngine;
import org.compass.core.lucene.engine.LuceneSearchEngineHits;
import org.compass.core.lucene.engine.LuceneSearchEngineInternalSearch;
import org.compass.core.lucene.engine.LuceneSearchEngineQuery;
import org.compass.core.lucene.engine.transaction.support.AbstractSearchTransactionProcessor;
import org.compass.core.lucene.engine.transaction.support.ResourceHashing;
import org.compass.core.lucene.engine.transaction.support.job.CreateTransactionJob;
import org.compass.core.lucene.engine.transaction.support.job.DeleteByQueryTransactionJob;
import org.compass.core.lucene.engine.transaction.support.job.DeleteTransactionJob;
import org.compass.core.lucene.engine.transaction.support.job.FlushCommitTransactionJob;
import org.compass.core.lucene.engine.transaction.support.job.TransactionJob;
import org.compass.core.lucene.engine.transaction.support.job.UpdateTransactionJob;
import org.compass.core.spi.InternalResource;
import org.compass.core.spi.ResourceKey;

public abstract class AbstractConcurrentTransactionProcessor
extends AbstractSearchTransactionProcessor {
    private final boolean waitForSearchOperations;
    private final boolean concurrentOperations;
    private final int concurrencyLevel;
    private Processor[] processors;
    private final ResourceHashing hashing;
    private final int backlog;
    private final long addTimeout;

    protected AbstractConcurrentTransactionProcessor(Log logger, LuceneSearchEngine searchEngine, boolean waitForSearchOperations, boolean concurrentOperations) {
        super(logger, searchEngine);
        this.waitForSearchOperations = waitForSearchOperations;
        this.concurrentOperations = concurrentOperations && searchEngine.getSettings().getSettingAsBoolean(this.getSettingName("concurrentOperations"), true);
        this.concurrencyLevel = searchEngine.getSettings().getSettingAsInt(this.getSettingName("concurrencyLevel"), 5);
        this.hashing = ResourceHashing.fromName(searchEngine.getSettings().getSetting(this.getSettingName("hashing"), "uid"));
        this.backlog = searchEngine.getSettings().getSettingAsInt(this.getSettingName("backlog"), 100);
        this.addTimeout = searchEngine.getSettings().getSettingAsTimeInMillis(this.getSettingName("addTimeout"), 10000L);
    }

    public boolean isConcurrentOperations() {
        return this.concurrentOperations;
    }

    public void begin() throws SearchEngineException {
    }

    public void prepare() throws SearchEngineException {
        if (this.concurrentOperations) {
            this.waitForJobs();
        }
        this.doPrepare();
    }

    protected abstract void doPrepare() throws SearchEngineException;

    public void commit(boolean onePhase) throws SearchEngineException {
        if (this.concurrentOperations) {
            this.waitForJobs();
        }
        this.doCommit(onePhase);
    }

    protected abstract void doCommit(boolean var1) throws SearchEngineException;

    public void rollback() throws SearchEngineException {
        this.clearJobs();
        this.doRollback();
    }

    protected abstract void doRollback() throws SearchEngineException;

    public void flush() throws SearchEngineException {
        this.waitForJobs();
        this.doFlush();
    }

    protected void doFlush() throws SearchEngineException {
    }

    public void create(InternalResource resource) throws SearchEngineException {
        CreateTransactionJob job = new CreateTransactionJob(resource);
        if (this.concurrentOperations) {
            this.prepareBeforeAsyncDirtyOperation(job);
            this.getProcessor(job).addJob(job);
        } else {
            this.doProcessJob(job);
        }
    }

    public void update(InternalResource resource) throws SearchEngineException {
        UpdateTransactionJob job = new UpdateTransactionJob(resource);
        if (this.concurrentOperations) {
            this.prepareBeforeAsyncDirtyOperation(job);
            this.getProcessor(job).addJob(job);
        } else {
            this.doProcessJob(job);
        }
    }

    public void delete(ResourceKey resourceKey) throws SearchEngineException {
        DeleteTransactionJob job = new DeleteTransactionJob(resourceKey);
        if (this.concurrentOperations) {
            this.prepareBeforeAsyncDirtyOperation(job);
            this.getProcessor(job).addJob(job);
        } else {
            this.doProcessJob(job);
        }
    }

    public void delete(LuceneSearchEngineQuery query) throws SearchEngineException {
        String[] calcSubIndexes;
        this.flush();
        for (String subIndex : calcSubIndexes = this.indexManager.getStore().calcSubIndexes(query.getSubIndexes(), query.getAliases())) {
            DeleteByQueryTransactionJob job = new DeleteByQueryTransactionJob(query.getQuery(), subIndex);
            if (this.concurrentOperations) {
                this.prepareBeforeAsyncDirtyOperation(job);
                this.getProcessor(job).addJob(job);
                continue;
            }
            this.doProcessJob(job);
        }
    }

    public void flushCommit(String ... aliases) throws SearchEngineException {
        this.flush();
        HashSet<String> calcSubIndexes = new HashSet<String>();
        if (aliases == null || aliases.length == 0) {
            calcSubIndexes.addAll(Arrays.asList(this.getDirtySubIndexes()));
        } else {
            HashSet<String> dirtySubIndxes = new HashSet<String>(Arrays.asList(this.getDirtySubIndexes()));
            HashSet<String> requiredSubIndexes = new HashSet<String>(Arrays.asList(this.indexManager.polyCalcSubIndexes(null, aliases, null)));
            for (String subIndex : this.indexManager.getSubIndexes()) {
                if (!dirtySubIndxes.contains(subIndex) || !requiredSubIndexes.contains(subIndex)) continue;
                calcSubIndexes.add(subIndex);
            }
        }
        for (String subIndex : calcSubIndexes) {
            FlushCommitTransactionJob job = new FlushCommitTransactionJob(subIndex);
            if (this.concurrentOperations) {
                this.prepareBeforeAsyncDirtyOperation(job);
                this.getProcessor(job).addJob(job);
                continue;
            }
            this.doProcessJob(job);
        }
        this.flush();
    }

    protected abstract String[] getDirtySubIndexes();

    protected abstract void doProcessJob(TransactionJob var1) throws SearchEngineException;

    protected abstract void prepareBeforeAsyncDirtyOperation(TransactionJob var1) throws SearchEngineException;

    public LuceneSearchEngineHits find(LuceneSearchEngineQuery query) throws SearchEngineException {
        if (this.waitForSearchOperations && this.concurrentOperations) {
            this.waitForJobs();
        }
        return this.doFind(query);
    }

    protected abstract LuceneSearchEngineHits doFind(LuceneSearchEngineQuery var1) throws SearchEngineException;

    public LuceneSearchEngineInternalSearch internalSearch(String[] subIndexes, String[] aliases) throws SearchEngineException {
        if (this.waitForSearchOperations && this.concurrentOperations) {
            this.waitForJobs();
        }
        return this.doInternalSearch(subIndexes, aliases);
    }

    protected abstract LuceneSearchEngineInternalSearch doInternalSearch(String[] var1, String[] var2) throws SearchEngineException;

    public Resource[] get(ResourceKey resourceKey) throws SearchEngineException {
        if (this.waitForSearchOperations && this.concurrentOperations) {
            this.waitForJobs();
        }
        return this.doGet(resourceKey);
    }

    protected abstract Resource[] doGet(ResourceKey var1) throws SearchEngineException;

    private void clearJobs() {
        if (!this.concurrentOperations || this.processors == null) {
            return;
        }
        InterruptedException ie = null;
        int lastId = -1;
        for (Processor processor : this.processors) {
            if (processor == null) continue;
            processor.clear();
            try {
                processor.stop();
            }
            catch (InterruptedException e) {
                lastId = processor.getId();
                ie = e;
            }
        }
        if (ie != null) {
            this.logger.warn((Object)("Failed to wait for processor [" + lastId + "] to stop, interrupted"), (Throwable)ie);
        }
        SearchEngineException exception = null;
        for (Processor processor : this.processors) {
            if (processor == null) continue;
            try {
                processor.waitTillStopped();
            }
            catch (InterruptedException e) {
                throw new SearchEngineException("Failed to wait for processor [" + processor.getId() + "] to be stopped / process all jobs", e);
            }
            exception = processor.getException();
        }
        if (exception != null) {
            this.logger.trace((Object)"EXception while waiting to clear jobs for rollback", exception);
        }
    }

    private void waitForJobs() throws SearchEngineException {
        if (!this.concurrentOperations || this.processors == null) {
            return;
        }
        InterruptedException ie = null;
        int lastId = -1;
        for (Processor processor : this.processors) {
            if (processor == null) continue;
            try {
                processor.stop();
            }
            catch (InterruptedException e) {
                lastId = processor.getId();
                ie = e;
            }
        }
        if (ie != null) {
            this.logger.warn((Object)("Failed to wait for processor [" + lastId + "] to stop, interrupted"), (Throwable)ie);
        }
        SearchEngineException exception = null;
        for (Processor processor : this.processors) {
            if (processor == null) continue;
            try {
                processor.waitTillStopped();
            }
            catch (InterruptedException e) {
                throw new SearchEngineException("Failed to wait for processor [" + processor.getId() + "] to be stopped / process all jobs", e);
            }
            exception = processor.getException();
        }
        if (exception != null) {
            throw exception;
        }
    }

    private Processor getProcessor(TransactionJob job) {
        int processorIndex;
        Processor processor;
        if (this.processors == null) {
            this.processors = new Processor[this.concurrencyLevel];
        }
        if ((processor = this.processors[processorIndex = this.hashing.hash(job) % this.concurrencyLevel]) == null) {
            this.processors[processorIndex] = processor = new Processor(processorIndex);
        }
        try {
            if (processor.needsReschedule()) {
                processor.start();
            }
        }
        catch (InterruptedException e) {
            throw new SearchEngineException("Failed to wait for processor [" + processor.getId() + "] to check if stopped", e);
        }
        return processor;
    }

    private class Processor
    implements Runnable {
        private final BlockingQueue<TransactionJob> jobs;
        private final int id;
        private volatile boolean stopped;
        private volatile CountDownLatch stopLatch;
        private volatile CountDownLatch startLatch;
        private volatile SearchEngineException exception;

        private Processor(int id) {
            this.jobs = new LinkedBlockingQueue<TransactionJob>(AbstractConcurrentTransactionProcessor.this.backlog);
            this.stopped = true;
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public SearchEngineException getException() {
            return this.exception;
        }

        public boolean needsReschedule() throws InterruptedException {
            if (this.stopped) {
                this.waitTillStopped();
            }
            return this.stopped;
        }

        public void start() {
            if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Starting"));
            }
            this.startLatch = new CountDownLatch(1);
            this.stopped = false;
            AbstractConcurrentTransactionProcessor.this.indexManager.getExecutorManager().submit(this);
        }

        public void stop() throws InterruptedException {
            if (this.stopped) {
                return;
            }
            if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Stopping"));
            }
            this.stopped = true;
        }

        public void clear() {
            this.jobs.clear();
        }

        public void waitTillStopped() throws InterruptedException {
            if (this.startLatch != null) {
                this.startLatch.await();
            }
            if (this.stopLatch != null) {
                this.stopLatch.await();
            }
        }

        public void addJob(TransactionJob job) throws SearchEngineException {
            if (this.exception != null) {
                throw this.exception;
            }
            try {
                boolean offered;
                if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                    AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Adding Job [" + job + "]"));
                }
                if (!(offered = this.jobs.offer(job, AbstractConcurrentTransactionProcessor.this.addTimeout, TimeUnit.MILLISECONDS))) {
                    throw new SearchEngineException("Processor [" + this.id + "]: Failed to add job [" + job + "] after [" + AbstractConcurrentTransactionProcessor.this.addTimeout + "ms] and backlog size [" + AbstractConcurrentTransactionProcessor.this.backlog + "]");
                }
            }
            catch (InterruptedException e) {
                throw new SearchEngineException("Processor [" + this.id + "]: Failed to add job [" + job + "], interrupted while adding to queue", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                this.exception = null;
                this.stopLatch = new CountDownLatch(1);
                this.startLatch.countDown();
                if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                    AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Started"));
                }
                while (!this.stopped) {
                    TransactionJob job;
                    try {
                        job = this.jobs.poll(100L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        if (this.stopped) break;
                        AbstractConcurrentTransactionProcessor.this.logger.warn((Object)("Processor [" + this.id + "]: Interrupted without being stopped"), (Throwable)e);
                        break;
                    }
                    if (job == null) continue;
                    try {
                        this.processJob(job);
                    }
                    catch (SearchEngineException e) {
                        this.exception = e;
                        break;
                    }
                }
                if (this.exception != null) {
                    if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                        AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Stopping because of an exception"), (Throwable)this.exception);
                    }
                } else {
                    if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                        AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Received stop, processing remaining jobs"));
                    }
                    try {
                        this.processRemainingJobs();
                    }
                    catch (SearchEngineException e) {
                        if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                            AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Failed to processes remaining jobs"), (Throwable)e);
                        }
                        this.exception = e;
                    }
                }
                if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                    AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Stopped"));
                }
            }
            catch (Exception e) {
                AbstractConcurrentTransactionProcessor.this.logger.warn((Object)("Processor [" + this.id + "]: Recevied an unexpected exception"), (Throwable)e);
            }
            finally {
                this.stopLatch.countDown();
            }
        }

        private void processRemainingJobs() throws SearchEngineException {
            ArrayList remainingJobs = new ArrayList();
            this.jobs.drainTo(remainingJobs);
            for (TransactionJob job : remainingJobs) {
                this.processJob(job);
            }
        }

        private void processJob(TransactionJob job) throws SearchEngineException {
            if (AbstractConcurrentTransactionProcessor.this.logger.isTraceEnabled()) {
                AbstractConcurrentTransactionProcessor.this.logger.trace((Object)("Processor [" + this.id + "]: Processing Job  [" + job + "]"));
            }
            AbstractConcurrentTransactionProcessor.this.doProcessJob(job);
        }
    }
}

