package org.apache.flume.sink.hdfs;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.sink.FlumeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/hdfs/BucketWriter.class */
class BucketWriter {
    private static final String IN_USE_EXT = ".tmp";
    private HDFSWriter writer;
    private FlumeFormatter formatter;
    private long eventCounter;
    private long processSize;
    private long lastRollTime;
    private long rollInterval;
    private long rollSize;
    private long rollCount;
    private long batchSize;
    private CompressionCodec codeC;
    private SequenceFile.CompressionType compType;
    private FileSystem fileSystem;
    private Context context;
    private volatile String filePath;
    private volatile String bucketPath;
    private volatile long batchCounter;
    private volatile boolean isOpen = false;
    private final AtomicLong fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
    private static final Logger LOG = LoggerFactory.getLogger(BucketWriter.class);
    private static final Integer staticLock = new Integer(1);

    private void resetCounters() {
        this.eventCounter = 0L;
        this.processSize = 0L;
        this.lastRollTime = System.currentTimeMillis();
        this.batchCounter = 0L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BucketWriter(long j, long j2, long j3, long j4, Context context, String str, CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType, HDFSWriter hDFSWriter, FlumeFormatter flumeFormatter) {
        this.rollInterval = j;
        this.rollSize = j2;
        this.rollCount = j3;
        this.batchSize = j4;
        this.context = context;
        this.filePath = str;
        this.codeC = compressionCodec;
        this.compType = compressionType;
        this.writer = hDFSWriter;
        this.formatter = flumeFormatter;
        this.writer.configure(this.context);
    }

    private void open() throws IOException {
        if (this.filePath == null || this.writer == null || this.formatter == null) {
            throw new IOException("Invalid file settings");
        }
        Configuration configuration = new Configuration();
        configuration.setBoolean("fs.automatic.close", false);
        synchronized (staticLock) {
            long incrementAndGet = this.fileExtensionCounter.incrementAndGet();
            if (this.codeC == null) {
                this.bucketPath = this.filePath + "." + incrementAndGet;
                this.fileSystem = new Path(this.bucketPath).getFileSystem(configuration);
                LOG.info("Creating " + this.bucketPath + IN_USE_EXT);
                this.writer.open(this.bucketPath + IN_USE_EXT, this.formatter);
            } else {
                this.bucketPath = this.filePath + "." + incrementAndGet + this.codeC.getDefaultExtension();
                this.fileSystem = new Path(this.bucketPath).getFileSystem(configuration);
                LOG.info("Creating " + this.bucketPath + IN_USE_EXT);
                this.writer.open(this.bucketPath + IN_USE_EXT, this.codeC, this.compType, this.formatter);
            }
        }
        resetCounters();
        this.isOpen = true;
    }

    public synchronized void close() throws IOException {
        LOG.debug("Closing {}", this.bucketPath + IN_USE_EXT);
        if (this.isOpen) {
            try {
                this.writer.close();
            } catch (IOException e) {
                LOG.warn("failed to close() HDFSWriter for file (" + this.bucketPath + IN_USE_EXT + "). Exception follows.", e);
            }
            this.isOpen = false;
        } else {
            LOG.info("HDFSWriter is already closed: {}", this.bucketPath + IN_USE_EXT);
        }
        if (this.bucketPath == null || this.fileSystem == null) {
            return;
        }
        renameBucket();
        this.fileSystem = null;
    }

    public synchronized void flush() throws IOException {
        this.writer.sync();
        this.batchCounter = 0L;
    }

    public synchronized void append(Event event) throws IOException {
        if (!this.isOpen) {
            open();
        }
        if (shouldRotate()) {
            close();
            open();
        }
        try {
            this.writer.append(event, this.formatter);
            this.processSize += event.getBody().length;
            this.eventCounter++;
            this.batchCounter++;
            if (this.batchCounter == this.batchSize) {
                flush();
            }
        } catch (IOException e) {
            LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + this.bucketPath + IN_USE_EXT + ") and rethrowing exception.", e.getMessage());
            try {
                close();
            } catch (IOException e2) {
                LOG.warn("Caught IOException while closing file (" + this.bucketPath + IN_USE_EXT + "). Exception follows.", e2);
            }
            throw e;
        }
    }

    private boolean shouldRotate() {
        boolean z = false;
        long currentTimeMillis = (System.currentTimeMillis() - this.lastRollTime) / 1000;
        if (this.rollInterval > 0 && this.rollInterval <= currentTimeMillis) {
            LOG.debug("rolling: rollTime: {}, elapsed: {}", Long.valueOf(this.rollInterval), Long.valueOf(currentTimeMillis));
            z = true;
        }
        if (this.rollCount > 0 && this.rollCount <= this.eventCounter) {
            LOG.debug("rolling: rollCount: {}, events: {}", Long.valueOf(this.rollCount), Long.valueOf(this.eventCounter));
            z = true;
        }
        if (this.rollSize > 0 && this.rollSize <= this.processSize) {
            LOG.debug("rolling: rollSize: {}, bytes: {}", Long.valueOf(this.rollSize), Long.valueOf(this.processSize));
            z = true;
        }
        return z;
    }

    private void renameBucket() throws IOException {
        Path path = new Path(this.bucketPath + IN_USE_EXT);
        Path path2 = new Path(this.bucketPath);
        if (this.fileSystem.exists(path)) {
            LOG.info("Renaming " + path + " to " + path2);
            this.fileSystem.rename(path, path2);
        }
    }

    public String toString() {
        return "[ " + getClass().getSimpleName() + " filePath = " + this.filePath + ", bucketPath = " + this.bucketPath + " ]";
    }

    public boolean isBatchComplete() {
        return this.batchCounter == 0;
    }
}
