package com.alibaba.ververica.connectors.common.source;

import com.alibaba.ververica.connectors.common.source.reader.ParallelReader;
import com.alibaba.ververica.connectors.common.source.reader.RecordReader;
import com.alibaba.ververica.connectors.common.source.reader.SequenceReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/source/AbstractParallelSourceBase.class */
public abstract class AbstractParallelSourceBase<T, CURSOR extends Serializable> extends InputFormatSourceFunction<T> implements ResultTypeQueryable<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractParallelSourceBase.class);
    private static final long serialVersionUID = -7848357196819780804L;
    protected transient ParallelReader<T, CURSOR> parallelReader;
    protected transient List<Tuple2<InputSplit, CURSOR>> initialProgress;
    protected transient SequenceReader<T> sequenceReader;
    protected boolean recoveryFromState;
    protected boolean enableWatermarkEmitter;
    protected boolean disableParallelRead;
    protected boolean initInputSplitInMaster;
    protected boolean tracingMetricEnabled;
    protected long tracingSampleInterval;
    protected volatile boolean exitAfterReadFinished;

    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/AbstractParallelSourceBase$ParallelSourceInputFormatWrapper.class */
    protected static class ParallelSourceInputFormatWrapper<T> implements InputFormat<T, InputSplit> {
        protected AbstractParallelSourceBase source;
        protected transient Configuration config;
        protected transient int[] taskInputSplitStartIndex;
        protected transient int[] taskInputSplitSize;

        public ParallelSourceInputFormatWrapper(AbstractParallelSourceBase<T, ?> abstractParallelSourceBase) {
            this.source = abstractParallelSourceBase;
        }

        public void configure(Configuration configuration) {
            this.config = configuration;
        }

        public InputSplit[] createInputSplits(int i) throws IOException {
            if (!this.source.initInputSplitInMaster) {
                return new InputSplit[0];
            }
            this.taskInputSplitSize = new int[i];
            this.taskInputSplitStartIndex = new int[i];
            ArrayList arrayList = new ArrayList();
            this.source.initOperator(this.config);
            for (int i2 = 0; i2 < i; i2++) {
                InputSplit[] mo1139createInputSplitsForCurrentSubTask = this.source.mo1139createInputSplitsForCurrentSubTask(i, i2);
                this.taskInputSplitStartIndex[i2] = arrayList.size();
                this.taskInputSplitSize[i2] = mo1139createInputSplitsForCurrentSubTask.length;
                for (InputSplit inputSplit : mo1139createInputSplitsForCurrentSubTask) {
                    arrayList.add(inputSplit);
                }
            }
            this.source.close();
            return (InputSplit[]) arrayList.toArray(new InputSplit[arrayList.size()]);
        }

        public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplitArr) {
            return (!this.source.initInputSplitInMaster || this.source.disableParallelRead) ? new DefaultInputSplitAssigner(inputSplitArr) : new PreAssignedInputSplitAssigner(inputSplitArr, this.taskInputSplitSize, this.taskInputSplitStartIndex);
        }

        public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
            throw new NotImplementedException();
        }

        public boolean reachedEnd() throws IOException {
            throw new NotImplementedException();
        }

        public T nextRecord(T t) throws IOException {
            throw new NotImplementedException();
        }

        public void close() throws IOException {
            throw new NotImplementedException();
        }

        public void open(InputSplit inputSplit) throws IOException {
            throw new NotImplementedException();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/AbstractParallelSourceBase$PreAssignedInputSplitAssigner.class */
    public static class PreAssignedInputSplitAssigner implements InputSplitAssigner {
        private final Map<Integer, Integer> assignedSplits;
        private final Map<Integer, Deque<InputSplit>> unassignedSplitsByTask = new HashMap();

        public PreAssignedInputSplitAssigner(InputSplit[] inputSplitArr, int[] iArr, int[] iArr2) {
            this.assignedSplits = new HashMap(inputSplitArr.length);
            List asList = Arrays.asList(inputSplitArr);
            for (int i = 0; i < iArr2.length; i++) {
                int i2 = iArr2[i];
                this.unassignedSplitsByTask.put(Integer.valueOf(i), new LinkedList(asList.subList(i2, iArr[i] + i2)));
            }
        }

        public InputSplit getNextInputSplit(String str, int i) {
            checkTaskIndex(i);
            InputSplit pollFirst = this.unassignedSplitsByTask.get(Integer.valueOf(i)).pollFirst();
            if (pollFirst != null) {
                this.assignedSplits.put(Integer.valueOf(pollFirst.getSplitNumber()), Integer.valueOf(i));
            }
            return pollFirst;
        }

        public void returnInputSplit(List<InputSplit> list, int i) {
            checkTaskIndex(i);
            Deque<InputSplit> deque = this.unassignedSplitsByTask.get(Integer.valueOf(i));
            for (InputSplit inputSplit : list) {
                Integer num = this.assignedSplits.get(Integer.valueOf(inputSplit.getSplitNumber()));
                Preconditions.checkState(num != null && num.intValue() == i, "Split " + inputSplit.getSplitNumber() + " was not assigned to task " + i);
                deque.addFirst(inputSplit);
            }
        }

        private void checkTaskIndex(int i) {
            Preconditions.checkArgument(i >= 0 && i < this.unassignedSplitsByTask.size(), "Fail to create");
        }
    }

    public AbstractParallelSourceBase() {
        super((InputFormat) null, (TypeInformation) null);
        this.recoveryFromState = false;
        this.enableWatermarkEmitter = true;
        this.disableParallelRead = false;
        this.initInputSplitInMaster = false;
        this.tracingMetricEnabled = true;
        this.tracingSampleInterval = 100L;
        this.exitAfterReadFinished = false;
    }

    public void disableWatermarkEmitter() {
        this.enableWatermarkEmitter = false;
    }

    public void enableExitAfterReadFinished() {
        this.exitAfterReadFinished = true;
    }

    public abstract RecordReader<T, CURSOR> createReader(Configuration configuration) throws IOException;

    /* renamed from: createInputSplitsForCurrentSubTask */
    public abstract InputSplit[] mo1139createInputSplitsForCurrentSubTask(int i, int i2) throws IOException;

    public void initOperator(Configuration configuration) throws IOException {
    }

    public abstract List<String> getPartitionList() throws Exception;

    public boolean isRecoveryFromState() {
        return this.recoveryFromState;
    }

    public void open(Configuration configuration) throws IOException {
        Map map;
        initOperator(configuration);
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        if (null != runtimeContext && null != runtimeContext.getExecutionConfig() && null != runtimeContext.getExecutionConfig().getGlobalJobParameters() && null != (map = runtimeContext.getExecutionConfig().getGlobalJobParameters().toMap()) && map.size() != 0) {
            for (String str : map.keySet()) {
                configuration.setString(str, (String) map.get(str));
            }
        }
        if (this.disableParallelRead) {
            createSequenceReader(configuration);
        } else {
            createParallelReader(configuration);
        }
        LOG.info("Init source succ.");
    }

    private void createSequenceReader(Configuration configuration) {
        Preconditions.checkArgument(this.initialProgress == null, "sequence read mode could not support checkpoint");
        this.sequenceReader = new SequenceReader<>(this, getRuntimeContext().getInputSplitProvider(), configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public void createParallelReader(Configuration configuration) throws IOException {
        if (this.initialProgress == null) {
            createInitialProgress();
        }
        this.parallelReader = new ParallelReader<>(getRuntimeContext(), configuration, this.enableWatermarkEmitter ? getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval() : 0L, this.tracingMetricEnabled, this.tracingSampleInterval, SystemClock.getInstance());
        this.parallelReader.setExitAfterReadFinished(this.exitAfterReadFinished);
        for (Tuple2<InputSplit, CURSOR> tuple2 : this.initialProgress) {
            LOG.info("entry of initialProgress:{}", tuple2);
            this.parallelReader.addRecordReader(createReader(configuration), (InputSplit) tuple2.f0, (Serializable) tuple2.f1);
            LOG.info("Reader {} seeking to {}", tuple2.f0, String.valueOf(tuple2.f1));
        }
        getRuntimeContext().getMetricGroup().counter("partition").inc(this.initialProgress.size());
    }

    protected void createInitialProgress() throws IOException {
        this.initialProgress = new LinkedList();
        if (this.initInputSplitInMaster) {
            StreamingRuntimeContext runtimeContext = getRuntimeContext();
            InputSplitProvider inputSplitProvider = runtimeContext.getInputSplitProvider();
            try {
                for (InputSplit nextInputSplit = inputSplitProvider.getNextInputSplit(runtimeContext.getUserCodeClassLoader()); nextInputSplit != null; nextInputSplit = inputSplitProvider.getNextInputSplit(runtimeContext.getUserCodeClassLoader())) {
                    this.initialProgress.add(Tuple2.of(nextInputSplit, (Object) null));
                }
                return;
            } catch (InputSplitProviderException e) {
                throw new IOException("Get inputsplit from JM error.", e);
            }
        }
        StreamingRuntimeContext runtimeContext2 = getRuntimeContext();
        for (InputSplit inputSplit : mo1139createInputSplitsForCurrentSubTask(runtimeContext2.getNumberOfParallelSubtasks(), runtimeContext2.getIndexOfThisSubtask())) {
            this.initialProgress.add(Tuple2.of(inputSplit, (Object) null));
        }
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.disableParallelRead) {
            this.sequenceReader.run(sourceContext);
        } else {
            this.parallelReader.run(sourceContext);
        }
    }

    public void close() throws IOException {
    }

    public void cancel() {
        if (this.parallelReader != null) {
            this.parallelReader.stop();
        }
        if (this.sequenceReader != null) {
            this.sequenceReader.stop();
        }
    }

    public TypeInformation<T> getProducedType() {
        return TypeExtractor.createTypeInfo(((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
    }

    public List<Tuple2<InputSplit, CURSOR>> getInitialProgress() {
        return this.initialProgress;
    }

    public InputFormat<T, InputSplit> getFormat() {
        return new ParallelSourceInputFormatWrapper(this);
    }

    public void disableParallelRead() {
        this.disableParallelRead = true;
    }

    public boolean getInitInputSplitInMaster() {
        return this.initInputSplitInMaster;
    }

    public void setInitInputSplitInMaster(boolean z) {
        this.initInputSplitInMaster = z;
    }

    public void enableParallelRead() {
        this.disableParallelRead = false;
    }

    public boolean isParallelReadDisabled() {
        return this.disableParallelRead;
    }

    public void enableTracingMetrics(int i) {
        this.tracingMetricEnabled = true;
        this.tracingSampleInterval = i;
    }

    public void disableTracingMetrics() {
        this.tracingMetricEnabled = false;
    }
}
