package org.apache.flume.channel.file;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.flume.Event;
import org.apache.flume.channel.file.LogFile;
import org.apache.flume.channel.file.TransactionEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/channel/file/Log.class */
class Log {
    public static final String PREFIX = "log-";
    private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
    private static final int MIN_NUM_LOGS = 2;
    private static final String FILE_LOCK = "in_use.lock";
    private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections.synchronizedMap(new HashMap());
    private final AtomicInteger nextFileID = new AtomicInteger(0);
    private final File checkpointDir;
    private final File[] logDirs;
    private final BackgroundWorker worker;
    private final int queueSize;
    private final AtomicReferenceArray<LogFile.Writer> logFiles;
    private volatile boolean open;
    private AtomicReference<Checkpoint> checkpoint;
    private Checkpoint checkpointA;
    private Checkpoint checkpointB;
    private FlumeEventQueue queue;
    private long checkpointInterval;
    private long maxFileSize;
    private final Map<String, FileLock> locks;

    /* loaded from: input_file:org/apache/flume/channel/file/Log$BackgroundWorker.class */
    static class BackgroundWorker extends Thread {
        private static final Logger LOG = LoggerFactory.getLogger(BackgroundWorker.class);
        private final Log log;
        private volatile boolean run = true;

        public BackgroundWorker(Log log) {
            this.log = log;
        }

        void shutdown() {
            if (this.run) {
                this.run = false;
                interrupt();
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.run) {
                try {
                    try {
                        Thread.sleep(Math.max(1000L, this.log.checkpointInterval / 10));
                        if (this.log.open && System.currentTimeMillis() - this.log.getLastCheckpoint() > this.log.checkpointInterval) {
                            this.log.writeCheckpoint();
                        }
                        if (this.log.open) {
                            this.log.removeOldLogs();
                        }
                    } catch (InterruptedException e) {
                    }
                } catch (IOException e2) {
                    LOG.error("Error doing checkpoint", e2);
                } catch (Exception e3) {
                    LOG.error("General error in checkpoint worker", e3);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Log(long j, long j2, int i, File file, File... fileArr) throws IOException {
        Preconditions.checkArgument(j > 0, "checkpointInterval <= 0");
        Preconditions.checkArgument(i > 0, "queueSize <= 0");
        Preconditions.checkArgument(j2 > 0, "maxFileSize <= 0");
        Preconditions.checkNotNull(file, FileChannelConfiguration.CHECKPOINT_DIR);
        Preconditions.checkArgument(file.isDirectory() || file.mkdirs(), "CheckpointDir " + file + " could not be created");
        Preconditions.checkNotNull(fileArr, "logDirs");
        Preconditions.checkArgument(fileArr.length > 0, "logDirs empty");
        for (File file2 : fileArr) {
            Preconditions.checkArgument(file2.isDirectory() || file2.mkdirs(), "LogDir " + file2 + " could not be created");
        }
        this.locks = Maps.newHashMap();
        try {
            lock(file);
            for (File file3 : fileArr) {
                lock(file3);
            }
            this.open = false;
            this.checkpointInterval = j;
            this.maxFileSize = j2;
            this.queueSize = i;
            this.checkpointDir = file;
            this.logDirs = fileArr;
            this.logFiles = new AtomicReferenceArray<>(this.logDirs.length);
            this.worker = new BackgroundWorker(this);
            this.worker.setName("Log-BackgroundWorker");
            this.worker.setDaemon(true);
            this.worker.start();
        } catch (IOException e) {
            unlock(file);
            for (File file4 : fileArr) {
                unlock(file4);
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Removed duplicated region for block: B:24:0x02ae A[Catch: Exception -> 0x02e7, LOOP:2: B:22:0x02a4->B:24:0x02ae, LOOP_END, TryCatch #1 {Exception -> 0x02e7, blocks: (B:6:0x0011, B:9:0x0038, B:10:0x004a, B:12:0x0054, B:14:0x00b9, B:16:0x00bf, B:36:0x013a, B:21:0x0242, B:22:0x02a4, B:24:0x02ae, B:26:0x02db, B:18:0x01a1, B:31:0x01b3, B:20:0x021a, B:34:0x01f2, B:39:0x0179), top: B:5:0x0011, inners: #0, #2 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public synchronized void replay() throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 756
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flume.channel.file.Log.replay():void");
    }

    int getNextFileID() {
        Preconditions.checkState(this.open, "Log is closed");
        return this.nextFileID.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlumeEventQueue getFlumeEventQueue() {
        Preconditions.checkState(this.open, "Log is closed");
        return this.queue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlumeEvent get(FlumeEventPointer flumeEventPointer) throws IOException, InterruptedException {
        Preconditions.checkState(this.open, "Log is closed");
        int fileID = flumeEventPointer.getFileID();
        LogFile.RandomReader randomReader = this.idLogFileMap.get(Integer.valueOf(fileID));
        Preconditions.checkNotNull(randomReader, "LogFile is null for id " + fileID);
        return randomReader.get(flumeEventPointer.getOffset());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlumeEventPointer put(long j, Event event) throws IOException {
        Preconditions.checkState(this.open, "Log is closed");
        Put put = new Put(Long.valueOf(j), new FlumeEvent(event.getHeaders(), event.getBody()));
        put.setTimestamp(System.currentTimeMillis());
        ByteBuffer byteBuffer = TransactionEventRecord.toByteBuffer(put);
        int nextLogWriter = nextLogWriter(j);
        if (this.logFiles.get(nextLogWriter).isRollRequired(byteBuffer)) {
            roll(nextLogWriter, byteBuffer);
        }
        boolean z = true;
        try {
            FlumeEventPointer put2 = this.logFiles.get(nextLogWriter).put(byteBuffer);
            z = false;
            if (0 != 0) {
                roll(nextLogWriter);
            }
            return put2;
        } catch (Throwable th) {
            if (z) {
                roll(nextLogWriter);
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void take(long j, FlumeEventPointer flumeEventPointer) throws IOException {
        Preconditions.checkState(this.open, "Log is closed");
        Take take = new Take(Long.valueOf(j), flumeEventPointer.getOffset(), flumeEventPointer.getFileID());
        take.setTimestamp(System.currentTimeMillis());
        ByteBuffer byteBuffer = TransactionEventRecord.toByteBuffer(take);
        int nextLogWriter = nextLogWriter(j);
        if (this.logFiles.get(nextLogWriter).isRollRequired(byteBuffer)) {
            roll(nextLogWriter, byteBuffer);
        }
        boolean z = true;
        try {
            this.logFiles.get(nextLogWriter).take(byteBuffer);
            z = false;
            if (0 != 0) {
                roll(nextLogWriter);
            }
        } catch (Throwable th) {
            if (z) {
                roll(nextLogWriter);
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void rollback(long j) throws IOException {
        Preconditions.checkState(this.open, "Log is closed");
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Rolling back " + j);
        }
        Rollback rollback = new Rollback(Long.valueOf(j));
        rollback.setTimestamp(System.currentTimeMillis());
        ByteBuffer byteBuffer = TransactionEventRecord.toByteBuffer(rollback);
        int nextLogWriter = nextLogWriter(j);
        if (this.logFiles.get(nextLogWriter).isRollRequired(byteBuffer)) {
            roll(nextLogWriter, byteBuffer);
        }
        boolean z = true;
        try {
            this.logFiles.get(nextLogWriter).rollback(byteBuffer);
            z = false;
            if (0 != 0) {
                roll(nextLogWriter);
            }
        } catch (Throwable th) {
            if (z) {
                roll(nextLogWriter);
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitPut(long j) throws IOException, InterruptedException {
        Preconditions.checkState(this.open, "Log is closed");
        commit(j, TransactionEventRecord.Type.PUT.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitTake(long j) throws IOException, InterruptedException {
        Preconditions.checkState(this.open, "Log is closed");
        commit(j, TransactionEventRecord.Type.TAKE.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void close() {
        this.open = false;
        if (this.worker != null) {
            this.worker.shutdown();
        }
        if (this.logFiles != null) {
            for (int i = 0; i < this.logFiles.length(); i++) {
                this.logFiles.get(i).close();
            }
        }
        synchronized (this.idLogFileMap) {
            Iterator<Integer> it = this.idLogFileMap.keySet().iterator();
            while (it.hasNext()) {
                LogFile.RandomReader randomReader = this.idLogFileMap.get(it.next());
                if (randomReader != null) {
                    randomReader.close();
                }
            }
        }
        try {
            unlock(this.checkpointDir);
        } catch (IOException e) {
            LOGGER.warn("Error unlocking " + this.checkpointDir, e);
        }
        for (File file : this.logDirs) {
            try {
                unlock(file);
            } catch (IOException e2) {
                LOGGER.warn("Error unlocking " + file, e2);
            }
        }
    }

    synchronized void shutdownWorker() {
        Preconditions.checkNotNull(this.worker, "worker");
        this.worker.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setCheckpointInterval(long j) {
        this.checkpointInterval = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMaxFileSize(long j) {
        this.maxFileSize = j;
    }

    private void commit(long j, short s) throws IOException {
        Commit commit = new Commit(Long.valueOf(j), s);
        commit.setTimestamp(System.currentTimeMillis());
        ByteBuffer byteBuffer = TransactionEventRecord.toByteBuffer(commit);
        int nextLogWriter = nextLogWriter(j);
        if (this.logFiles.get(nextLogWriter).isRollRequired(byteBuffer)) {
            roll(nextLogWriter, byteBuffer);
        }
        boolean z = true;
        try {
            this.logFiles.get(nextLogWriter).commit(byteBuffer);
            z = false;
            if (0 != 0) {
                roll(nextLogWriter);
            }
        } catch (Throwable th) {
            if (z) {
                roll(nextLogWriter);
            }
            throw th;
        }
    }

    private int nextLogWriter(long j) {
        return (int) Math.abs(j % this.logFiles.length());
    }

    private void roll(int i) throws IOException {
        roll(i, null);
    }

    private synchronized void roll(int i, ByteBuffer byteBuffer) throws IOException {
        LogFile.Writer writer = this.logFiles.get(i);
        if (writer == null || byteBuffer == null || writer.isRollRequired(byteBuffer)) {
            try {
                LOGGER.info("Roll start " + this.logDirs[i]);
                int incrementAndGet = this.nextFileID.incrementAndGet();
                File file = new File(this.logDirs[i], PREFIX + incrementAndGet);
                Preconditions.checkState(!file.exists(), "File alread exists " + file);
                Preconditions.checkState(file.createNewFile(), "File could not be created " + file);
                this.idLogFileMap.put(Integer.valueOf(incrementAndGet), new LogFile.RandomReader(file));
                this.logFiles.set(i, new LogFile.Writer(file, incrementAndGet, this.maxFileSize));
                if (writer != null) {
                    writer.close();
                }
                LOGGER.info("Roll end");
            } catch (Throwable th) {
                LOGGER.info("Roll end");
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void writeCheckpoint() throws IOException {
        synchronized (this.queue) {
            this.checkpoint.get().write(this.queue);
            if (!this.checkpoint.compareAndSet(this.checkpointA, this.checkpointB)) {
                Preconditions.checkState(this.checkpoint.compareAndSet(this.checkpointB, this.checkpointA));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLastCheckpoint() throws IOException {
        Preconditions.checkState(this.open, "Log is closed");
        return this.checkpoint.get().getTimestamp();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeOldLogs() {
        Preconditions.checkState(this.open, "Log is closed");
        TreeSet treeSet = new TreeSet(this.queue.getFileIDs());
        for (int i = 0; i < this.logDirs.length; i++) {
            treeSet.add(Integer.valueOf(this.logFiles.get(i).getFileID()));
        }
        int intValue = ((Integer) Collections.min(treeSet)).intValue();
        LOGGER.debug("Files currently in use: " + treeSet);
        for (File file : this.logDirs) {
            List<File> logs = LogUtils.getLogs(file);
            LogUtils.sort(logs);
            int size = logs.size() - MIN_NUM_LOGS;
            for (int i2 = 0; i2 < size; i2++) {
                File file2 = logs.get(i2);
                int iDForFile = LogUtils.getIDForFile(file2);
                if (iDForFile < intValue) {
                    LogFile.RandomReader remove = this.idLogFileMap.remove(Integer.valueOf(iDForFile));
                    if (remove != null) {
                        remove.close();
                    }
                    LOGGER.info("Removing old log " + file2 + ", result = " + file2.delete() + ", minFileID " + intValue);
                }
            }
        }
    }

    private void lock(File file) throws IOException {
        FileLock tryLock = tryLock(file);
        if (tryLock == null) {
            String str = "Cannot lock " + file + ". The directory is already locked.";
            LOGGER.info(str);
            throw new IOException(str);
        }
        FileLock tryLock2 = tryLock(file);
        if (tryLock2 != null) {
            LOGGER.warn("Directory " + file + " does not support locking");
            tryLock2.release();
            tryLock2.channel().close();
        }
        this.locks.put(file.getAbsolutePath(), tryLock);
    }

    private FileLock tryLock(File file) throws IOException {
        File file2 = new File(file, FILE_LOCK);
        file2.deleteOnExit();
        RandomAccessFile randomAccessFile = new RandomAccessFile(file2, "rws");
        try {
            return randomAccessFile.getChannel().tryLock();
        } catch (IOException e) {
            LOGGER.error("Cannot create lock on " + file2, e);
            randomAccessFile.close();
            throw e;
        } catch (OverlappingFileLockException e2) {
            randomAccessFile.close();
            return null;
        }
    }

    private void unlock(File file) throws IOException {
        FileLock remove = this.locks.remove(file.getAbsolutePath());
        if (remove == null) {
            return;
        }
        remove.release();
        remove.channel().close();
    }
}
