package org.apache.flume.channel.file;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/channel/file/FileChannel.class */
public class FileChannel extends BasicChannelSemantics {
    private int capacity;
    private int keepAlive;
    private int transactionCapacity;
    private long checkpointInterval;
    private long maxFileSize;
    private File checkpointDir;
    private File[] dataDirs;
    private Log log;
    private boolean shutdownHookAdded;
    private Thread shutdownHook;
    private volatile boolean open;
    private Semaphore queueRemaining;
    private final ThreadLocal<FileBackedTransaction> transactions = new ThreadLocal<>();
    private static final Logger LOG = LoggerFactory.getLogger(FileChannel.class);
    private static final AtomicLong TRANSACTION_ID = new AtomicLong(System.currentTimeMillis());

    /* loaded from: input_file:org/apache/flume/channel/file/FileChannel$FileBackedTransaction.class */
    static class FileBackedTransaction extends BasicTransactionSemantics {
        private final LinkedBlockingDeque<FlumeEventPointer> takeList;
        private final LinkedBlockingDeque<FlumeEventPointer> putList;
        private final long transactionID;
        private final int keepAlive;
        private final Log log;
        private final FlumeEventQueue queue;
        private final Semaphore queueRemaining;

        public FileBackedTransaction(Log log, long j, int i, int i2, Semaphore semaphore) {
            this.log = log;
            this.queue = log.getFlumeEventQueue();
            this.transactionID = j;
            this.keepAlive = i2;
            this.queueRemaining = semaphore;
            this.putList = new LinkedBlockingDeque<>(i);
            this.takeList = new LinkedBlockingDeque<>(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isClosed() {
            return BasicTransactionSemantics.State.CLOSED.equals(getState());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getStateAsString() {
            return String.valueOf(getState());
        }

        protected void doPut(Event event) throws InterruptedException {
            if (this.putList.remainingCapacity() == 0) {
                throw new ChannelException("Put queue for FileBackedTransaction of capacity " + this.putList.size() + " full, consider committing more frequently, increasing capacity or increasing thread count");
            }
            if (!this.queueRemaining.tryAcquire(this.keepAlive, TimeUnit.SECONDS)) {
                throw new ChannelException("Cannot acquire capacity");
            }
            try {
                Preconditions.checkState(this.putList.offer(this.log.put(this.transactionID, event)));
            } catch (IOException e) {
                throw new ChannelException("Put failed due to IO error", e);
            }
        }

        protected Event doTake() throws InterruptedException {
            if (this.takeList.remainingCapacity() == 0) {
                throw new ChannelException("Take list for FileBackedTransaction, capacity " + this.takeList.size() + " full, consider committing more frequently, increasing capacity, or increasing thread count");
            }
            FlumeEventPointer removeHead = this.queue.removeHead();
            if (removeHead == null) {
                return null;
            }
            try {
                Preconditions.checkState(this.takeList.offer(removeHead));
                this.log.take(this.transactionID, removeHead);
                return this.log.get(removeHead);
            } catch (IOException e) {
                throw new ChannelException("Take failed due to IO error", e);
            }
        }

        protected void doCommit() throws InterruptedException {
            int size = this.putList.size();
            int size2 = this.takeList.size();
            if (size > 0) {
                Preconditions.checkState(size2 == 0);
                synchronized (this.queue) {
                    while (!this.putList.isEmpty()) {
                        if (!this.queue.addTail(this.putList.removeFirst())) {
                            StringBuilder sb = new StringBuilder();
                            sb.append("Queue add failed, this shouldn't be able to ");
                            sb.append("happen. A portion of the transaction has been ");
                            sb.append("added to the queue but the remaining portion ");
                            sb.append("cannot be added. Those messages will be consumed ");
                            sb.append("despite this transaction failing. Please report.");
                            FileChannel.LOG.error(sb.toString());
                            Preconditions.checkState(false, sb.toString());
                        }
                    }
                }
                try {
                    this.log.commitPut(this.transactionID);
                } catch (IOException e) {
                    throw new ChannelException("Commit failed due to IO error", e);
                }
            } else if (size2 > 0) {
                try {
                    this.log.commitTake(this.transactionID);
                    this.queueRemaining.release(size2);
                } catch (IOException e2) {
                    throw new ChannelException("Commit failed due to IO error", e2);
                }
            }
            this.putList.clear();
            this.takeList.clear();
        }

        protected void doRollback() throws InterruptedException {
            int size = this.putList.size();
            if (this.takeList.size() > 0) {
                Preconditions.checkState(size == 0);
                while (!this.takeList.isEmpty()) {
                    Preconditions.checkState(this.queue.addHead(this.takeList.removeLast()), "Queue add failed, this shouldn't be able to happen");
                }
            }
            this.queueRemaining.release(size);
            try {
                this.log.rollback(this.transactionID);
                this.putList.clear();
                this.takeList.clear();
            } catch (IOException e) {
                throw new ChannelException("Commit failed due to IO error", e);
            }
        }
    }

    public void configure(Context context) {
        String replace = System.getProperty("user.home").replace('\\', '/');
        String string = context.getString(FileChannelConfiguration.CHECKPOINT_DIR, replace + "/.flume/file-channel/checkpoint");
        String[] split = context.getString(FileChannelConfiguration.DATA_DIRS, replace + "/.flume/file-channel/data").split(",");
        if (this.checkpointDir == null) {
            this.checkpointDir = new File(string);
        } else if (!this.checkpointDir.getAbsolutePath().equals(new File(string).getAbsolutePath())) {
            LOG.warn("An attempt was made to change the checkpoint directory after start, this is not supported.");
        }
        if (this.dataDirs == null) {
            this.dataDirs = new File[split.length];
            for (int i = 0; i < split.length; i++) {
                this.dataDirs[i] = new File(split[i]);
            }
        } else {
            boolean z = false;
            if (this.dataDirs.length != split.length) {
                z = true;
            } else {
                int i2 = 0;
                while (true) {
                    if (i2 >= split.length) {
                        break;
                    }
                    if (!this.dataDirs[i2].getAbsolutePath().equals(new File(split[i2]).getAbsolutePath())) {
                        z = true;
                        break;
                    }
                    i2++;
                }
            }
            if (z) {
                LOG.warn("An attempt was made to change the data directories after start, this is not supported.");
            }
        }
        int intValue = context.getInteger(FileChannelConfiguration.CAPACITY, Integer.valueOf(FileChannelConfiguration.DEFAULT_CAPACITY)).intValue();
        if (this.capacity <= 0 || intValue == this.capacity) {
            this.capacity = intValue;
        } else {
            LOG.warn("Capacity of this channel cannot be sized on the fly due the requirement we have enough DirectMemory for the queue and downsizing of the queue cannot be guranteed due to the fact there maybe more items on the queue than the new capacity.");
        }
        this.keepAlive = context.getInteger(FileChannelConfiguration.KEEP_ALIVE, 3).intValue();
        this.transactionCapacity = context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY, Integer.valueOf(FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY)).intValue();
        this.checkpointInterval = context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL, Long.valueOf(FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL)).longValue();
        this.maxFileSize = Math.min(context.getLong(FileChannelConfiguration.MAX_FILE_SIZE, 2146435071L).longValue(), 2146435071L);
        if (this.queueRemaining == null) {
            this.queueRemaining = new Semaphore(this.capacity, true);
        }
        if (this.log != null) {
            this.log.setCheckpointInterval(this.checkpointInterval);
            this.log.setMaxFileSize(this.maxFileSize);
        }
    }

    public synchronized void start() {
        LOG.info("Starting FileChannel with dataDir " + Arrays.toString(this.dataDirs));
        try {
            this.log = new Log(this.checkpointInterval, this.maxFileSize, this.capacity, this.checkpointDir, this.dataDirs);
            this.log.replay();
        } catch (IOException e) {
            Throwables.propagate(e);
        }
        this.open = true;
        try {
            int depth = getDepth();
            Preconditions.checkState(this.queueRemaining.tryAcquire(depth), "Unable to acquire " + depth + " permits");
            LOG.info("Queue Size after replay: " + depth);
            if (!this.shutdownHookAdded) {
                this.shutdownHookAdded = true;
                LOG.info("Adding shutdownhook for " + this);
                this.shutdownHook = new Thread() { // from class: org.apache.flume.channel.file.FileChannel.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        String arrays = Arrays.toString(this.dataDirs);
                        FileChannel.LOG.info("Closing FileChannel " + arrays);
                        try {
                            this.close();
                        } catch (Exception e2) {
                            FileChannel.LOG.error("Error closing fileChannel " + arrays, e2);
                        }
                    }
                };
                Runtime.getRuntime().addShutdownHook(this.shutdownHook);
            }
            if (0 != 0) {
                this.open = false;
            }
            super.start();
        } catch (Throwable th) {
            if (1 != 0) {
                this.open = false;
            }
            throw th;
        }
    }

    public synchronized void stop() {
        LOG.info("Stopping FileChannel with dataDir " + Arrays.toString(this.dataDirs));
        try {
            if (this.shutdownHookAdded && this.shutdownHook != null) {
                Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                this.shutdownHookAdded = false;
                this.shutdownHook = null;
            }
            super.stop();
        } finally {
            close();
        }
    }

    protected BasicTransactionSemantics createTransaction() {
        Preconditions.checkState(this.open, "Channel closed");
        FileBackedTransaction fileBackedTransaction = this.transactions.get();
        if (fileBackedTransaction != null && !fileBackedTransaction.isClosed()) {
            Preconditions.checkState(false, "Thread has transaction which is still open: " + fileBackedTransaction.getStateAsString());
        }
        FileBackedTransaction fileBackedTransaction2 = new FileBackedTransaction(this.log, TRANSACTION_ID.incrementAndGet(), this.transactionCapacity, this.keepAlive, this.queueRemaining);
        this.transactions.set(fileBackedTransaction2);
        return fileBackedTransaction2;
    }

    int getDepth() {
        Preconditions.checkState(this.open, "Channel closed");
        Preconditions.checkNotNull(this.log, "log");
        FlumeEventQueue flumeEventQueue = this.log.getFlumeEventQueue();
        Preconditions.checkNotNull(flumeEventQueue, "queue");
        return flumeEventQueue.size();
    }

    void close() {
        if (this.open) {
            this.open = false;
            this.log.close();
            this.log = null;
            this.queueRemaining = null;
        }
    }
}
