/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.source.reader;

import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.source.AbstractParallelSourceBase;
import com.alibaba.ververica.connectors.common.source.reader.RecordReader;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class SequenceReader<T> {
    private InputSplitProvider inputSplitProvider;
    private AbstractParallelSourceBase<T, ?> sourceFunction;
    private Configuration config;
    private volatile boolean isStop = false;
    private Meter tpsMetric;

    public SequenceReader(AbstractParallelSourceBase<T, ?> source, InputSplitProvider provider, Configuration config) {
        this.sourceFunction = source;
        this.inputSplitProvider = provider;
        this.config = config;
        this.tpsMetric = MetricUtils.registerNumRecordBatchesInRate(source.getRuntimeContext());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws InputSplitProviderException, IOException, InterruptedException {
        InputSplit inputSplit = this.inputSplitProvider.getNextInputSplit(this.sourceFunction.getRuntimeContext().getUserCodeClassLoader());
        while (!this.isStop && inputSplit != null) {
            try (RecordReader<T, ?> recordReader = this.sourceFunction.createReader(this.config);){
                recordReader.open(inputSplit, this.sourceFunction.getRuntimeContext());
                while (!this.isStop && recordReader.next()) {
                    if (recordReader.isHeartBeat()) continue;
                    Object object = ctx.getCheckpointLock();
                    synchronized (object) {
                        this.tpsMetric.markEvent();
                        ctx.collect(recordReader.getMessage());
                    }
                }
            }
            inputSplit = this.inputSplitProvider.getNextInputSplit(this.sourceFunction.getRuntimeContext().getUserCodeClassLoader());
        }
    }

    public void stop() {
        this.isStop = true;
    }
}

