package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricRecorder;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.class */
public abstract class AbstractFetcher<T, KPH> {
    private static final int NO_TIMESTAMPS_WATERMARKS = 0;
    private static final int WITH_WATERMARK_GENERATOR = 1;
    protected final SourceFunction.SourceContext<T> sourceContext;
    protected final WatermarkOutput watermarkOutput;
    private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
    protected final Object checkpointLock;
    private final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates;
    protected final ClosableBlockingQueue<KafkaTopicPartitionState<T, KPH>> unassignedPartitionsQueue;
    private final int timestampWatermarkMode;
    private final SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
    private final ClassLoader userCodeClassLoader;
    private final boolean useMetrics;
    private final MetricGroup consumerMetricGroup;

    @Deprecated
    private final MetricGroup legacyCurrentOffsetsMetricGroup;

    @Deprecated
    private final MetricGroup legacyCommittedOffsetsMetricGroup;
    protected final KafkaMetricRecorder kafkaMetricRecorder;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$OffsetGauge.class */
    public static class OffsetGauge implements Gauge<Long> {
        private final KafkaTopicPartitionState<?, ?> ktp;
        private final OffsetGaugeType gaugeType;

        OffsetGauge(KafkaTopicPartitionState<?, ?> kafkaTopicPartitionState, OffsetGaugeType offsetGaugeType) {
            this.ktp = kafkaTopicPartitionState;
            this.gaugeType = offsetGaugeType;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m553getValue() {
            switch (this.gaugeType) {
                case COMMITTED_OFFSET:
                    return Long.valueOf(this.ktp.getCommittedOffset());
                case CURRENT_OFFSET:
                    return Long.valueOf(this.ktp.getOffset());
                default:
                    throw new RuntimeException("Unknown gauge type: " + this.gaugeType);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$OffsetGaugeType.class */
    public enum OffsetGaugeType {
        CURRENT_OFFSET,
        COMMITTED_OFFSET
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$PeriodicWatermarkEmitter.class */
    private static class PeriodicWatermarkEmitter<T, KPH> implements ProcessingTimeCallback {
        private final Object checkpointLock;
        private final List<KafkaTopicPartitionState<T, KPH>> allPartitions;
        private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
        private final ProcessingTimeService timerService;
        private final long interval;

        PeriodicWatermarkEmitter(Object obj, List<KafkaTopicPartitionState<T, KPH>> list, WatermarkOutputMultiplexer watermarkOutputMultiplexer, ProcessingTimeService processingTimeService, long j) {
            this.checkpointLock = obj;
            this.allPartitions = (List) Preconditions.checkNotNull(list);
            this.watermarkOutputMultiplexer = watermarkOutputMultiplexer;
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.interval = j;
        }

        public void start() {
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }

        public void onProcessingTime(long j) {
            synchronized (this.checkpointLock) {
                Iterator<KafkaTopicPartitionState<T, KPH>> it = this.allPartitions.iterator();
                while (it.hasNext()) {
                    it.next().onPeriodicEmit();
                }
                this.watermarkOutputMultiplexer.onPeriodicEmit();
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, MetricGroup metricGroup, MetricGroup metricGroup2, boolean z) throws Exception {
        this.kafkaMetricRecorder = new KafkaMetricRecorder((OperatorMetricGroup) metricGroup);
        this.kafkaMetricRecorder.registerSourceMetrics();
        this.sourceContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
        this.watermarkOutput = new SourceContextWatermarkOutputAdapter(sourceContext, this.kafkaMetricRecorder);
        this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(this.watermarkOutput);
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.userCodeClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.useMetrics = z;
        this.consumerMetricGroup = (MetricGroup) Preconditions.checkNotNull(metricGroup2);
        this.legacyCurrentOffsetsMetricGroup = metricGroup2.addGroup(KafkaConsumerMetricConstants.LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
        this.legacyCommittedOffsetsMetricGroup = metricGroup2.addGroup(KafkaConsumerMetricConstants.LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);
        this.watermarkStrategy = serializedValue;
        if (serializedValue == null) {
            this.timestampWatermarkMode = 0;
        } else {
            this.timestampWatermarkMode = 1;
        }
        this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
        this.subscribedPartitionStates = createPartitionStateHolders(map, this.timestampWatermarkMode, serializedValue, classLoader);
        Iterator<KafkaTopicPartitionState<T, KPH>> it = this.subscribedPartitionStates.iterator();
        while (it.hasNext()) {
            if (!it.next().isOffsetDefined()) {
                throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
            }
        }
        Iterator<KafkaTopicPartitionState<T, KPH>> it2 = this.subscribedPartitionStates.iterator();
        while (it2.hasNext()) {
            this.unassignedPartitionsQueue.add(it2.next());
        }
        if (z) {
            registerOffsetMetrics(metricGroup2, this.subscribedPartitionStates);
        }
        if (this.timestampWatermarkMode != 1 || j <= 0) {
            return;
        }
        new PeriodicWatermarkEmitter(this.checkpointLock, this.subscribedPartitionStates, this.watermarkOutputMultiplexer, processingTimeService, j).start();
    }

    public void addDiscoveredPartitions(List<KafkaTopicPartition> list) throws IOException, ClassNotFoundException {
        List<KafkaTopicPartitionState<T, KPH>> createPartitionStateHolders = createPartitionStateHolders(list, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET, this.timestampWatermarkMode, this.watermarkStrategy, this.userCodeClassLoader);
        if (this.useMetrics) {
            registerOffsetMetrics(this.consumerMetricGroup, createPartitionStateHolders);
        }
        for (KafkaTopicPartitionState<T, KPH> kafkaTopicPartitionState : createPartitionStateHolders) {
            this.subscribedPartitionStates.add(kafkaTopicPartitionState);
            this.unassignedPartitionsQueue.add(kafkaTopicPartitionState);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates() {
        return this.subscribedPartitionStates;
    }

    public abstract void runFetchLoop() throws Exception;

    public abstract void cancel();

    public final void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) throws Exception {
        doCommitInternalOffsetsToKafka(filterOutSentinels(map), kafkaCommitCallback);
    }

    protected abstract void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) throws Exception;

    private Map<KafkaTopicPartition, Long> filterOutSentinels(Map<KafkaTopicPartition, Long> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return !KafkaTopicPartitionStateSentinel.isSentinel(((Long) entry.getValue()).longValue());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    protected abstract KPH createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition);

    public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        HashMap<KafkaTopicPartition, Long> hashMap = new HashMap<>(this.subscribedPartitionStates.size());
        for (KafkaTopicPartitionState<T, KPH> kafkaTopicPartitionState : this.subscribedPartitionStates) {
            hashMap.put(kafkaTopicPartitionState.getKafkaTopicPartition(), Long.valueOf(kafkaTopicPartitionState.getOffset()));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitRecordsWithTimestamps(Queue<T> queue, KafkaTopicPartitionState<T, KPH> kafkaTopicPartitionState, long j, long j2) {
        synchronized (this.checkpointLock) {
            while (true) {
                T poll = queue.poll();
                if (poll != null) {
                    long extractTimestamp = kafkaTopicPartitionState.extractTimestamp(poll, j2);
                    this.sourceContext.collectWithTimestamp(poll, extractTimestamp);
                    kafkaTopicPartitionState.onEvent(poll, extractTimestamp);
                } else {
                    kafkaTopicPartitionState.setOffset(j);
                }
            }
        }
    }

    private List<KafkaTopicPartitionState<T, KPH>> createPartitionStateHolders(Map<KafkaTopicPartition, Long> map, int i, SerializedValue<WatermarkStrategy<T>> serializedValue, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        switch (i) {
            case 0:
                for (Map.Entry<KafkaTopicPartition, Long> entry : map.entrySet()) {
                    KafkaTopicPartitionState kafkaTopicPartitionState = new KafkaTopicPartitionState(entry.getKey(), createKafkaPartitionHandle(entry.getKey()));
                    kafkaTopicPartitionState.setOffset(entry.getValue().longValue());
                    copyOnWriteArrayList.add(kafkaTopicPartitionState);
                }
                return copyOnWriteArrayList;
            case 1:
                for (Map.Entry<KafkaTopicPartition, Long> entry2 : map.entrySet()) {
                    KafkaTopicPartition key = entry2.getKey();
                    KPH createKafkaPartitionHandle = createKafkaPartitionHandle(key);
                    WatermarkStrategy watermarkStrategy = (WatermarkStrategy) serializedValue.deserializeValue(classLoader);
                    String str = key.getTopic() + '-' + key.getPartition();
                    this.watermarkOutputMultiplexer.registerNewOutput(str);
                    KafkaTopicPartitionStateWithWatermarkGenerator kafkaTopicPartitionStateWithWatermarkGenerator = new KafkaTopicPartitionStateWithWatermarkGenerator(entry2.getKey(), createKafkaPartitionHandle, watermarkStrategy.createTimestampAssigner(() -> {
                        return this.consumerMetricGroup;
                    }), watermarkStrategy.createWatermarkGenerator(() -> {
                        return this.consumerMetricGroup;
                    }), this.watermarkOutputMultiplexer.getImmediateOutput(str), this.watermarkOutputMultiplexer.getDeferredOutput(str));
                    kafkaTopicPartitionStateWithWatermarkGenerator.setOffset(entry2.getValue().longValue());
                    copyOnWriteArrayList.add(kafkaTopicPartitionStateWithWatermarkGenerator);
                }
                return copyOnWriteArrayList;
            default:
                throw new RuntimeException();
        }
    }

    private List<KafkaTopicPartitionState<T, KPH>> createPartitionStateHolders(List<KafkaTopicPartition> list, long j, int i, SerializedValue<WatermarkStrategy<T>> serializedValue, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        HashMap hashMap = new HashMap(list.size());
        Iterator<KafkaTopicPartition> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(j));
        }
        return createPartitionStateHolders(hashMap, i, serializedValue, classLoader);
    }

    private void registerOffsetMetrics(MetricGroup metricGroup, List<KafkaTopicPartitionState<T, KPH>> list) {
        for (KafkaTopicPartitionState<T, KPH> kafkaTopicPartitionState : list) {
            MetricGroup addGroup = metricGroup.addGroup("topic", kafkaTopicPartitionState.getTopic()).addGroup("partition", Integer.toString(kafkaTopicPartitionState.getPartition()));
            addGroup.gauge(KafkaConsumerMetricConstants.CURRENT_OFFSETS_METRICS_GAUGE, new OffsetGauge(kafkaTopicPartitionState, OffsetGaugeType.CURRENT_OFFSET));
            addGroup.gauge(KafkaConsumerMetricConstants.COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(kafkaTopicPartitionState, OffsetGaugeType.COMMITTED_OFFSET));
            this.legacyCurrentOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(kafkaTopicPartitionState), new OffsetGauge(kafkaTopicPartitionState, OffsetGaugeType.CURRENT_OFFSET));
            this.legacyCommittedOffsetsMetricGroup.gauge(getLegacyOffsetsMetricsGaugeName(kafkaTopicPartitionState), new OffsetGauge(kafkaTopicPartitionState, OffsetGaugeType.COMMITTED_OFFSET));
        }
    }

    private static String getLegacyOffsetsMetricsGaugeName(KafkaTopicPartitionState<?, ?> kafkaTopicPartitionState) {
        return kafkaTopicPartitionState.getTopic() + "-" + kafkaTopicPartitionState.getPartition();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public long currentTimestampMs() {
        return System.currentTimeMillis();
    }

    static {
        $assertionsDisabled = !AbstractFetcher.class.desiredAssertionStatus();
    }
}
