/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.source;

import com.alibaba.ververica.connectors.common.source.reader.ParallelReader;
import com.alibaba.ververica.connectors.common.source.reader.RecordReader;
import com.alibaba.ververica.connectors.common.source.reader.SequenceReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

public abstract class AbstractParallelSourceBase<T, CURSOR extends Serializable>
extends InputFormatSourceFunction<T>
implements ResultTypeQueryable<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractParallelSourceBase.class);
    private static final long serialVersionUID = -7848357196819780804L;
    protected transient ParallelReader<T, CURSOR> parallelReader;
    protected transient List<Tuple2<InputSplit, CURSOR>> initialProgress;
    protected transient SequenceReader<T> sequenceReader;
    protected boolean recoveryFromState = false;
    protected boolean enableWatermarkEmitter = true;
    protected boolean disableParallelRead = false;
    protected boolean initInputSplitInMaster = false;
    protected boolean tracingMetricEnabled = true;
    protected long tracingSampleInterval = 100L;
    protected volatile boolean exitAfterReadFinished = false;

    public AbstractParallelSourceBase() {
        super(null, null);
    }

    public void disableWatermarkEmitter() {
        this.enableWatermarkEmitter = false;
    }

    public void enableExitAfterReadFinished() {
        this.exitAfterReadFinished = true;
    }

    public abstract RecordReader<T, CURSOR> createReader(Configuration var1) throws IOException;

    public abstract InputSplit[] createInputSplitsForCurrentSubTask(int var1, int var2) throws IOException;

    public void initOperator(Configuration config) throws IOException {
    }

    public abstract List<String> getPartitionList() throws Exception;

    public boolean isRecoveryFromState() {
        return this.recoveryFromState;
    }

    public void open(Configuration config) throws IOException {
        Map globalParametersMap;
        this.initOperator(config);
        StreamingRuntimeContext context = (StreamingRuntimeContext)this.getRuntimeContext();
        if (null != context && null != context.getExecutionConfig() && null != context.getExecutionConfig().getGlobalJobParameters() && null != (globalParametersMap = context.getExecutionConfig().getGlobalJobParameters().toMap()) && globalParametersMap.size() != 0) {
            for (String s : globalParametersMap.keySet()) {
                config.setString(s, (String)globalParametersMap.get(s));
            }
        }
        if (this.disableParallelRead) {
            this.createSequenceReader(config);
        } else {
            this.createParallelReader(config);
        }
        LOG.info("Init source succ.");
    }

    private void createSequenceReader(Configuration config) {
        Preconditions.checkArgument((this.initialProgress == null ? 1 : 0) != 0, (Object)"sequence read mode could not support checkpoint");
        StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
        InputSplitProvider provider = runtimeContext.getInputSplitProvider();
        this.sequenceReader = new SequenceReader(this, provider, config);
    }

    protected void createParallelReader(Configuration config) throws IOException {
        if (this.initialProgress == null) {
            this.createInitialProgress();
        }
        long watermarkInterval = 0L;
        if (this.enableWatermarkEmitter) {
            watermarkInterval = this.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
        }
        this.parallelReader = new ParallelReader(this.getRuntimeContext(), config, watermarkInterval, this.tracingMetricEnabled, this.tracingSampleInterval, (Clock)SystemClock.getInstance());
        this.parallelReader.setExitAfterReadFinished(this.exitAfterReadFinished);
        for (Tuple2<InputSplit, CURSOR> entry : this.initialProgress) {
            LOG.info("entry of initialProgress:{}", entry);
            RecordReader<T, CURSOR> reader = this.createReader(config);
            this.parallelReader.addRecordReader(reader, (InputSplit)entry.f0, (Serializable)entry.f1);
            LOG.info("Reader {} seeking to {}", entry.f0, (Object)String.valueOf(entry.f1));
        }
        this.getRuntimeContext().getMetricGroup().counter("partition").inc((long)this.initialProgress.size());
    }

    protected void createInitialProgress() throws IOException {
        this.initialProgress = new LinkedList<Tuple2<InputSplit, CURSOR>>();
        if (this.initInputSplitInMaster) {
            StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
            InputSplitProvider provider = runtimeContext.getInputSplitProvider();
            try {
                InputSplit inputSplit = provider.getNextInputSplit(runtimeContext.getUserCodeClassLoader());
                while (inputSplit != null) {
                    this.initialProgress.add(Tuple2.of((Object)inputSplit, null));
                    inputSplit = provider.getNextInputSplit(runtimeContext.getUserCodeClassLoader());
                }
            }
            catch (InputSplitProviderException e) {
                throw new IOException("Get inputsplit from JM error.", e);
            }
        } else {
            InputSplit[] inputSplits;
            StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
            for (InputSplit inputSplit : inputSplits = this.createInputSplitsForCurrentSubTask(runtimeContext.getNumberOfParallelSubtasks(), runtimeContext.getIndexOfThisSubtask())) {
                this.initialProgress.add(Tuple2.of((Object)inputSplit, null));
            }
        }
    }

    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        if (this.disableParallelRead) {
            this.sequenceReader.run(ctx);
        } else {
            this.parallelReader.run(ctx);
        }
    }

    public void close() throws IOException {
    }

    public void cancel() {
        if (this.parallelReader != null) {
            this.parallelReader.stop();
        }
        if (this.sequenceReader != null) {
            this.sequenceReader.stop();
        }
    }

    public TypeInformation<T> getProducedType() {
        ParameterizedType p = (ParameterizedType)((Object)((Object)this)).getClass().getGenericSuperclass();
        return TypeExtractor.createTypeInfo((Type)p.getActualTypeArguments()[0]);
    }

    public List<Tuple2<InputSplit, CURSOR>> getInitialProgress() {
        return this.initialProgress;
    }

    public InputFormat<T, InputSplit> getFormat() {
        return new ParallelSourceInputFormatWrapper(this);
    }

    public void disableParallelRead() {
        this.disableParallelRead = true;
    }

    public boolean getInitInputSplitInMaster() {
        return this.initInputSplitInMaster;
    }

    public void setInitInputSplitInMaster(boolean enabled) {
        this.initInputSplitInMaster = enabled;
    }

    public void enableParallelRead() {
        this.disableParallelRead = false;
    }

    public boolean isParallelReadDisabled() {
        return this.disableParallelRead;
    }

    public void enableTracingMetrics(int tracingSampleInterval) {
        this.tracingMetricEnabled = true;
        this.tracingSampleInterval = tracingSampleInterval;
    }

    public void disableTracingMetrics() {
        this.tracingMetricEnabled = false;
    }

    @VisibleForTesting
    public static class PreAssignedInputSplitAssigner
    implements InputSplitAssigner {
        private final Map<Integer, Integer> assignedSplits;
        private final Map<Integer, Deque<InputSplit>> unassignedSplitsByTask;

        public PreAssignedInputSplitAssigner(InputSplit[] inputSplits, int[] taskInputSplitSize, int[] taskInputSplitStartIndex) {
            this.assignedSplits = new HashMap<Integer, Integer>(inputSplits.length);
            this.unassignedSplitsByTask = new HashMap<Integer, Deque<InputSplit>>();
            List<InputSplit> splitList = Arrays.asList(inputSplits);
            for (int i = 0; i < taskInputSplitStartIndex.length; ++i) {
                int startingIndexForTask = taskInputSplitStartIndex[i];
                LinkedList<InputSplit> splitsForTask = new LinkedList<InputSplit>(splitList.subList(startingIndexForTask, taskInputSplitSize[i] + startingIndexForTask));
                this.unassignedSplitsByTask.put(i, splitsForTask);
            }
        }

        public InputSplit getNextInputSplit(String location, int taskIndex) {
            this.checkTaskIndex(taskIndex);
            Deque<InputSplit> unassignedSplitsForTask = this.unassignedSplitsByTask.get(taskIndex);
            InputSplit split = unassignedSplitsForTask.pollFirst();
            if (split != null) {
                this.assignedSplits.put(split.getSplitNumber(), taskIndex);
            }
            return split;
        }

        public void returnInputSplit(List<InputSplit> splits, int taskIndex) {
            this.checkTaskIndex(taskIndex);
            Deque<InputSplit> splitsForTask = this.unassignedSplitsByTask.get(taskIndex);
            for (InputSplit split : splits) {
                Integer assignedTask = this.assignedSplits.get(split.getSplitNumber());
                Preconditions.checkState((assignedTask != null && assignedTask == taskIndex ? 1 : 0) != 0, (Object)("Split " + split.getSplitNumber() + " was not assigned to task " + taskIndex));
                splitsForTask.addFirst(split);
            }
        }

        private void checkTaskIndex(int taskIndex) {
            Preconditions.checkArgument((taskIndex >= 0 && taskIndex < this.unassignedSplitsByTask.size() ? 1 : 0) != 0, (Object)"Fail to create");
        }
    }

    protected static class ParallelSourceInputFormatWrapper<T>
    implements InputFormat<T, InputSplit> {
        protected AbstractParallelSourceBase source;
        protected transient Configuration config;
        protected transient int[] taskInputSplitStartIndex;
        protected transient int[] taskInputSplitSize;

        public ParallelSourceInputFormatWrapper(AbstractParallelSourceBase<T, ?> source) {
            this.source = source;
        }

        public void configure(Configuration configuration) {
            this.config = configuration;
        }

        public InputSplit[] createInputSplits(int taskNum) throws IOException {
            if (!this.source.initInputSplitInMaster) {
                return new InputSplit[0];
            }
            this.taskInputSplitSize = new int[taskNum];
            this.taskInputSplitStartIndex = new int[taskNum];
            ArrayList<InputSplit> allSplits = new ArrayList<InputSplit>();
            this.source.initOperator(this.config);
            for (int i = 0; i < taskNum; ++i) {
                InputSplit[] inputSplits = this.source.createInputSplitsForCurrentSubTask(taskNum, i);
                this.taskInputSplitStartIndex[i] = allSplits.size();
                this.taskInputSplitSize[i] = inputSplits.length;
                for (InputSplit inputSplit : inputSplits) {
                    allSplits.add(inputSplit);
                }
            }
            this.source.close();
            return allSplits.toArray(new InputSplit[allSplits.size()]);
        }

        public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
            if (!this.source.initInputSplitInMaster || this.source.disableParallelRead) {
                return new DefaultInputSplitAssigner(inputSplits);
            }
            return new PreAssignedInputSplitAssigner(inputSplits, this.taskInputSplitSize, this.taskInputSplitStartIndex);
        }

        public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
            throw new NotImplementedException();
        }

        public boolean reachedEnd() throws IOException {
            throw new NotImplementedException();
        }

        public T nextRecord(T t) throws IOException {
            throw new NotImplementedException();
        }

        public void close() throws IOException {
            throw new NotImplementedException();
        }

        public void open(InputSplit inputSplit) throws IOException {
            throw new NotImplementedException();
        }
    }
}

