/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.channel.recoverable.memory;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannelEvent;
import org.apache.flume.channel.recoverable.memory.wal.WAL;
import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoverableMemoryChannel
extends BasicChannelSemantics {
    private static final Logger LOG = LoggerFactory.getLogger(RecoverableMemoryChannel.class);
    public static final String WAL_DATA_DIR = "wal.dataDir";
    public static final String WAL_ROLL_SIZE = "wal.rollSize";
    public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
    public static final String WAL_MIN_RENTENTION_PERIOD = "wal.minRententionPeriod";
    public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
    public static final String CAPACITY = "capacity";
    public static final String KEEPALIVE = "keep-alive";
    public static final int DEFAULT_CAPACITY = 100;
    public static final int DEFAULT_KEEPALIVE = 3;
    private MemoryChannel memoryChannel = new MemoryChannel();
    private AtomicLong seqidGenerator = new AtomicLong(0L);
    private WAL<RecoverableMemoryChannelEvent> wal;
    private Semaphore queueRemaining;
    private int capacity;
    private int keepAlive;
    private volatile boolean open = false;

    public void configure(Context context) {
        this.memoryChannel.configure(context);
        int capacity = context.getInteger(CAPACITY, Integer.valueOf(100));
        if (this.queueRemaining == null) {
            this.queueRemaining = new Semaphore(capacity, true);
        } else if (capacity > this.capacity) {
            this.queueRemaining.release(capacity - this.capacity);
        } else if (capacity < this.capacity) {
            this.queueRemaining.acquireUninterruptibly(this.capacity - capacity);
        }
        this.capacity = capacity;
        this.keepAlive = context.getInteger(KEEPALIVE, Integer.valueOf(3));
        long rollSize = context.getLong(WAL_ROLL_SIZE, Long.valueOf(0x4000000L));
        long maxLogsSize = context.getLong(WAL_MAX_LOGS_SIZE, Long.valueOf(0x20000000L));
        long minLogRetentionPeriod = context.getLong(WAL_MIN_RENTENTION_PERIOD, Long.valueOf(300000L));
        long workerInterval = context.getLong(WAL_WORKER_INTERVAL, Long.valueOf(60000L));
        if (this.wal == null) {
            String homePath = System.getProperty("user.home").replace('\\', '/');
            String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
            try {
                this.wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir), RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize, minLogRetentionPeriod, workerInterval);
            }
            catch (IOException e) {
                Throwables.propagate((Throwable)e);
            }
        } else {
            this.wal.setRollSize(rollSize);
            this.wal.setMaxLogsSize(maxLogsSize);
            this.wal.setMinLogRetentionPeriod(minLogRetentionPeriod);
            this.wal.setWorkerInterval(workerInterval);
            LOG.warn(((Object)((Object)this)).getClass().getSimpleName() + " only supports " + "partial reconfiguration.");
        }
    }

    public synchronized void start() {
        LOG.info("Starting " + (Object)((Object)this));
        try {
            WALReplayResult<RecoverableMemoryChannelEvent> results = this.wal.replay();
            Preconditions.checkArgument((results.getSequenceID() >= 0L ? 1 : 0) != 0);
            LOG.info("Replay SequenceID " + results.getSequenceID());
            this.seqidGenerator.set(results.getSequenceID());
            int numResults = results.getResults().size();
            Preconditions.checkState((numResults <= this.capacity ? 1 : 0) != 0, (Object)("Capacity " + this.capacity + ", but we need to replay " + numResults));
            LOG.info("Replay Events " + numResults);
            for (WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
                this.seqidGenerator.set(Math.max(entry.getSequenceID(), this.seqidGenerator.get()));
            }
            for (WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
                Transaction transaction = null;
                try {
                    transaction = this.memoryChannel.getTransaction();
                    transaction.begin();
                    this.memoryChannel.put((Event)entry.getData());
                    transaction.commit();
                }
                catch (Exception e) {
                    if (transaction != null) {
                        try {
                            transaction.rollback();
                        }
                        catch (Exception ex) {
                            LOG.info("Error during rollback", (Throwable)ex);
                        }
                    }
                    Throwables.propagate((Throwable)e);
                }
                catch (Error e) {
                    if (transaction != null) {
                        try {
                            transaction.rollback();
                        }
                        catch (Exception ex) {
                            LOG.info("Error during rollback", (Throwable)ex);
                        }
                    }
                    throw e;
                }
                finally {
                    if (transaction == null) continue;
                    transaction.close();
                }
            }
        }
        catch (IOException e) {
            Throwables.propagate((Throwable)e);
        }
        super.start();
        this.open = true;
    }

    public synchronized void stop() {
        this.open = false;
        LOG.info("Stopping " + (Object)((Object)this));
        try {
            this.close();
        }
        catch (IOException e) {
            Throwables.propagate((Throwable)e);
        }
        super.stop();
    }

    protected BasicTransactionSemantics createTransaction() {
        return new RecoverableMemoryTransaction(this, this.memoryChannel);
    }

    private void commitEvents(List<RecoverableMemoryChannelEvent> events) throws IOException {
        ArrayList entries = Lists.newArrayList();
        for (RecoverableMemoryChannelEvent event : events) {
            entries.add(new WALEntry<RecoverableMemoryChannelEvent>(event, event.sequenceId));
        }
        this.wal.writeEntries(entries);
    }

    private void commitSequenceID(List<Long> seqids) throws IOException {
        this.wal.writeSequenceIDs(seqids);
    }

    private long nextSequenceID() {
        return this.seqidGenerator.incrementAndGet();
    }

    void close() throws IOException {
        if (this.wal != null) {
            this.wal.close();
        }
    }

    private static class RecoverableMemoryTransaction
    extends BasicTransactionSemantics {
        private Transaction transaction;
        private MemoryChannel memoryChannel;
        private RecoverableMemoryChannel channel;
        private List<Long> sequenceIds = Lists.newArrayList();
        private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
        private int takes;

        private RecoverableMemoryTransaction(RecoverableMemoryChannel channel, MemoryChannel memoryChannel) {
            this.channel = channel;
            this.memoryChannel = memoryChannel;
            this.transaction = this.memoryChannel.getTransaction();
            this.takes = 0;
        }

        protected void doBegin() throws InterruptedException {
            this.transaction.begin();
        }

        protected void doPut(Event event) throws InterruptedException {
            if (!this.channel.open) {
                throw new ChannelException("Channel not open");
            }
            if (!this.channel.queueRemaining.tryAcquire(this.channel.keepAlive, TimeUnit.SECONDS)) {
                throw new ChannelException("Cannot acquire capacity");
            }
            RecoverableMemoryChannelEvent sequencedEvent = new RecoverableMemoryChannelEvent(event, this.channel.nextSequenceID());
            this.memoryChannel.put((Event)sequencedEvent);
            this.events.add(sequencedEvent);
        }

        protected Event doTake() throws InterruptedException {
            if (!this.channel.open) {
                throw new ChannelException("Channel not open");
            }
            RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)this.memoryChannel.take();
            if (event != null) {
                this.sequenceIds.add(event.sequenceId);
                ++this.takes;
                return event.event;
            }
            return null;
        }

        protected void doCommit() throws InterruptedException {
            if (!this.channel.open) {
                throw new ChannelException("Channel not open");
            }
            if (this.sequenceIds.size() > 0) {
                try {
                    this.channel.commitSequenceID(this.sequenceIds);
                }
                catch (IOException e) {
                    throw new ChannelException("Unable to commit", (Throwable)e);
                }
            }
            if (!this.events.isEmpty()) {
                try {
                    this.channel.commitEvents(this.events);
                }
                catch (IOException e) {
                    throw new ChannelException("Unable to commit", (Throwable)e);
                }
            }
            this.transaction.commit();
            this.channel.queueRemaining.release(this.takes);
        }

        protected void doRollback() throws InterruptedException {
            this.sequenceIds.clear();
            this.events.clear();
            this.channel.queueRemaining.release(this.events.size());
            this.transaction.rollback();
        }

        protected void doClose() {
            this.sequenceIds.clear();
            this.events.clear();
            this.transaction.close();
        }
    }
}

