package com.alibaba.ververica.connectors.common.source.reader;

import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.metrics.Latency;
import com.alibaba.ververica.connectors.common.metrics.SourceMetricNames;
import com.alibaba.ververica.connectors.common.source.WatermarkProvider;
import java.io.IOException;
import java.io.Serializable;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/source/reader/ParallelReader.class */
public class ParallelReader<OUT, CURSOR extends Serializable> implements WatermarkProvider {
    private static final String SPLIT_PIPE_LEN_CONFIG = "yarn.app.blink.source.buffer-len";
    private static final String IDLE_INTERVAL_CONFIG = "yarn.app.blink.source.idle-interval";
    private static final long STOP_WAITING = 5;
    private long idleInterval;
    private int splitPipeLen;
    private final long watermarkInterval;
    private final RuntimeContext context;
    private Configuration config;
    private final Meter tpsMetric;
    private final boolean tracingMetricEnabled;
    private Latency partitionLatency;
    private Latency processLatency;
    private Gauge<Integer> partitionCount;
    private final long sampleInterval;
    private long inputCount;
    private final Clock clock;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ParallelReader.class);
    public static final Long INVALID = -1L;
    private final ExecutorService readerPool = Executors.newCachedThreadPool();
    private final BlockingQueue<ReaderRunner<OUT, CURSOR>> readerRunners = new LinkedBlockingQueue();
    private WatermarkEmitter<OUT> watermarkEmitter = null;
    private volatile boolean stop = false;
    private volatile boolean exitAfterReadFinished = false;
    private final Map<Integer, Long> splitIdToEmitLag = new ConcurrentHashMap();
    private final Map<Integer, Long> splitIdToSamplingCount = new HashMap();
    private transient Map<InputSplit, CURSOR> exitedReadRunnerSplitCursor = new HashMap();

    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/reader/ParallelReader$DelayGauge.class */
    public class DelayGauge implements Gauge<Long> {
        private final ConcurrentHashMap<Integer, Long> delayStats = new ConcurrentHashMap<>();
        private BlockingQueue<ReaderRunner<OUT, CURSOR>> readerRunners;
        private DelayKind delayKind;

        public DelayGauge(BlockingQueue<ReaderRunner<OUT, CURSOR>> blockingQueue, DelayKind delayKind) {
            this.readerRunners = blockingQueue;
            this.delayKind = delayKind;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m1148getValue() {
            if (this.delayKind == DelayKind.WATERMARK_DELAY) {
                long watermark = ParallelReader.this.getWatermark();
                if (watermark == Long.MIN_VALUE) {
                    return Long.MIN_VALUE;
                }
                return Long.valueOf(ParallelReader.this.clock.absoluteTimeMillis() - watermark);
            }
            if (null != this.readerRunners) {
                this.delayStats.clear();
                Iterator it = this.readerRunners.iterator();
                while (it.hasNext()) {
                    ReaderRunner readerRunner = (ReaderRunner) it.next();
                    switch (this.delayKind) {
                        case FETCHED_DELAY:
                            if (readerRunner.getFetchedDelay() > 0) {
                                this.delayStats.put(Integer.valueOf(readerRunner.getSplit().getSplitNumber()), Long.valueOf(readerRunner.getFetchedDelay()));
                                break;
                            } else {
                                break;
                            }
                        case NO_DATA_DELAY:
                            if (readerRunner.getDelay() > 0 && readerRunner.getFetchedDelay() > 0) {
                                long absoluteTimeMillis = (ParallelReader.this.clock.absoluteTimeMillis() - readerRunner.getDelay()) - readerRunner.getFetchedDelay();
                                if (absoluteTimeMillis > 10000) {
                                    this.delayStats.put(Integer.valueOf(readerRunner.getSplit().getSplitNumber()), Long.valueOf(absoluteTimeMillis));
                                    break;
                                } else {
                                    this.delayStats.put(Integer.valueOf(readerRunner.getSplit().getSplitNumber()), 0L);
                                    break;
                                }
                            }
                            break;
                    }
                }
            }
            while (true) {
                try {
                    long j = 0;
                    for (Map.Entry<Integer, Long> entry : this.delayStats.entrySet()) {
                        if (j < entry.getValue().longValue()) {
                            j = entry.getValue().longValue();
                        }
                    }
                    return Long.valueOf(j);
                } catch (ConcurrentModificationException e) {
                    ParallelReader.LOG.debug("Unable to report delay statistics", (Throwable) e);
                }
            }
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/reader/ParallelReader$DelayKind.class */
    public enum DelayKind {
        FETCHED_DELAY,
        NO_DATA_DELAY,
        WATERMARK_DELAY
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/reader/ParallelReader$Progress.class */
    public static class Progress<CURSOR extends Serializable> implements Serializable {
        private Map<InputSplit, CURSOR> splitProgress = new HashMap();

        public void addProgress(InputSplit inputSplit, CURSOR cursor) {
            if (cursor != null) {
                this.splitProgress.put(inputSplit, cursor);
            }
        }

        public Map<InputSplit, CURSOR> getProgress() {
            return this.splitProgress;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/reader/ParallelReader$ReaderRunner.class */
    public static class ReaderRunner<OUT, CURSOR extends Serializable> implements Runnable, WatermarkProvider, Comparable<ReaderRunner<OUT, CURSOR>> {
        private final RuntimeContext runtimeContext;
        private final RecordReader<OUT, CURSOR> recordReader;
        private final int splitPipeLen;
        private final long idleInterval;
        private final boolean isTracingMetricEnabled;
        private final long tracingInterval;
        private BlockingDeque<Tuple5<OUT, Long, Long, CURSOR, Long>> splitPipe;
        private InputSplit split;
        private volatile CURSOR progress;
        private volatile Throwable cause;
        private volatile Thread currentThread;
        private long traceStart;
        private volatile long watermark = Long.MIN_VALUE;
        private volatile boolean stop = false;
        private volatile boolean stopped = false;
        private volatile boolean finished = false;
        private long outputCount = 0;

        public ReaderRunner(RecordReader<OUT, CURSOR> recordReader, InputSplit inputSplit, CURSOR cursor, RuntimeContext runtimeContext, int i, long j, boolean z, long j2) {
            this.progress = null;
            this.recordReader = recordReader;
            this.split = inputSplit;
            this.progress = cursor;
            this.runtimeContext = runtimeContext;
            this.splitPipeLen = i;
            this.idleInterval = j;
            this.isTracingMetricEnabled = z;
            this.tracingInterval = j2;
            this.splitPipe = new LinkedBlockingDeque(i);
        }

        public InputSplit getSplit() {
            return this.split;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.recordReader.open(this.split, this.runtimeContext);
                    if (this.progress != null) {
                        ParallelReader.LOG.info("init progress not null, seek {}", this.progress);
                        this.recordReader.seek(this.progress);
                    }
                    this.currentThread = Thread.currentThread();
                    while (!this.stop && !this.finished) {
                        if (this.isTracingMetricEnabled) {
                            this.traceStart = System.nanoTime();
                            if (this.outputCount % this.tracingInterval == 0) {
                                getNextMessageWithTracing();
                            } else {
                                getNextMessage();
                            }
                            this.outputCount++;
                        } else {
                            getNextMessage();
                        }
                    }
                } catch (InterruptedException e) {
                    if (!this.stop) {
                        this.cause = e;
                    }
                    ParallelReader.LOG.info("Split reader " + this.split + " is interrupted.", (Throwable) e);
                    try {
                        if (this.recordReader != null) {
                            ParallelReader.LOG.info("Closing split {} reader runner.", this.split);
                            this.recordReader.close();
                        }
                    } catch (Throwable th) {
                        ParallelReader.LOG.error("Exception caught in closing record reader", th);
                        if (this.cause == null) {
                            this.cause = th;
                        }
                    }
                    this.stopped = true;
                    ParallelReader.LOG.info("Split reader {} thread will exit.", this.split);
                } catch (Throwable th2) {
                    this.cause = th2;
                    ParallelReader.LOG.error("Split reader " + this.split + " is failed cause: ", th2);
                    try {
                        if (this.recordReader != null) {
                            ParallelReader.LOG.info("Closing split {} reader runner.", this.split);
                            this.recordReader.close();
                        }
                    } catch (Throwable th3) {
                        ParallelReader.LOG.error("Exception caught in closing record reader", th3);
                        if (this.cause == null) {
                            this.cause = th3;
                        }
                    }
                    this.stopped = true;
                    ParallelReader.LOG.info("Split reader {} thread will exit.", this.split);
                }
            } finally {
                try {
                    if (this.recordReader != null) {
                        ParallelReader.LOG.info("Closing split {} reader runner.", this.split);
                        this.recordReader.close();
                    }
                } catch (Throwable th4) {
                    ParallelReader.LOG.error("Exception caught in closing record reader", th4);
                    if (this.cause == null) {
                        this.cause = th4;
                    }
                }
                this.stopped = true;
                ParallelReader.LOG.info("Split reader {} thread will exit.", this.split);
            }
        }

        private void getNextMessageWithTracing() throws IOException, InterruptedException {
            while (!this.stop && !this.finished) {
                this.finished = !this.recordReader.next();
                if (this.finished) {
                    ParallelReader.LOG.info("Finishing Split {}.", this.split);
                } else {
                    if (!this.recordReader.isHeartBeat()) {
                        put(this.recordReader.getMessage(), System.nanoTime() - this.traceStart);
                        return;
                    }
                    updateWatermarkAndProgress(this.recordReader.getWatermark(), this.recordReader.getProgress());
                }
            }
        }

        private void getNextMessage() throws IOException, InterruptedException {
            while (!this.stop && !this.finished) {
                this.finished = !this.recordReader.next();
                if (this.finished) {
                    ParallelReader.LOG.info("Finishing Split {}.", this.split);
                } else {
                    if (!this.recordReader.isHeartBeat()) {
                        put(this.recordReader.getMessage(), 0L);
                        return;
                    }
                    updateWatermarkAndProgress(this.recordReader.getWatermark(), this.recordReader.getProgress());
                }
            }
        }

        protected void put(OUT out, long j) throws IOException, InterruptedException {
            Tuple5<OUT, Long, Long, CURSOR, Long> tuple5 = new Tuple5<>(out, Long.valueOf(this.recordReader.getWatermark()), Long.valueOf(this.recordReader.getWatermark()), this.recordReader.getProgress(), Long.valueOf(j));
            while (!this.stop && !this.splitPipe.offer(tuple5, this.idleInterval, TimeUnit.MILLISECONDS)) {
            }
        }

        public void stop() {
            this.stop = true;
            if (this.recordReader instanceof Interruptible) {
                ((Interruptible) this.recordReader).interrupt();
            }
        }

        public boolean isStopped() {
            return this.stopped;
        }

        public boolean isExhausted() {
            return this.stop || (this.finished && this.splitPipe.isEmpty());
        }

        public Throwable getCause() {
            return this.cause;
        }

        public synchronized void updateWatermarkAndProgress(long j, CURSOR cursor) {
            Tuple5<OUT, Long, Long, CURSOR, Long> peekLast = this.splitPipe.peekLast();
            if (peekLast != null) {
                peekLast.f2 = Long.valueOf(j);
                peekLast.f3 = cursor;
            } else {
                this.watermark = j;
                this.progress = cursor;
            }
        }

        public boolean hasRecord() {
            return !this.splitPipe.isEmpty();
        }

        public synchronized Tuple3<OUT, Long, Long> pollRecord() {
            Tuple5<OUT, Long, Long, CURSOR, Long> poll = this.splitPipe.poll();
            if (poll == null) {
                return null;
            }
            this.watermark = ((Long) poll.f2).longValue();
            this.progress = (CURSOR) poll.f3;
            return new Tuple3<>(poll.f0, poll.f1, poll.f4);
        }

        public synchronized CURSOR getProgress() throws IOException {
            return this.progress;
        }

        @Override // com.alibaba.ververica.connectors.common.source.WatermarkProvider
        public synchronized long getWatermark() {
            return this.watermark;
        }

        @Override // java.lang.Comparable
        public int compareTo(ReaderRunner<OUT, CURSOR> readerRunner) {
            return Long.compare(getWatermark(), readerRunner.getWatermark());
        }

        public String getStackTrace() {
            if (this.currentThread == null || !this.currentThread.isAlive()) {
                return "No stack trace, maybe thread already exited.";
            }
            StringBuilder sb = new StringBuilder();
            for (StackTraceElement stackTraceElement : this.currentThread.getStackTrace()) {
                sb.append(stackTraceElement).append('\n');
            }
            return sb.toString();
        }

        public long getDelay() {
            return this.recordReader.getDelay();
        }

        public long getFetchedDelay() {
            return this.recordReader.getFetchedDelay();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/ververica/connectors/common/source/reader/ParallelReader$WatermarkEmitter.class */
    public static class WatermarkEmitter<OUT> implements Runnable {
        private volatile boolean stopped = false;
        private ParallelReader provider;
        private SourceFunction.SourceContext<OUT> ctx;
        private long watermarkInterval;

        public WatermarkEmitter(ParallelReader parallelReader, long j, SourceFunction.SourceContext<OUT> sourceContext) {
            this.provider = parallelReader;
            this.ctx = sourceContext;
            this.watermarkInterval = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            while (!this.stopped) {
                Watermark watermark = new Watermark(this.provider.getWatermark());
                if (j != watermark.getTimestamp()) {
                    j = watermark.getTimestamp();
                    this.ctx.emitWatermark(watermark);
                }
                try {
                    Thread.sleep(this.watermarkInterval);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }

        public void stop() {
            this.stopped = true;
        }
    }

    public ParallelReader(RuntimeContext runtimeContext, Configuration configuration, long j, boolean z, long j2, Clock clock) {
        this.idleInterval = 10L;
        this.splitPipeLen = 10;
        this.context = runtimeContext;
        this.config = configuration;
        this.watermarkInterval = j;
        this.clock = clock;
        this.splitPipeLen = configuration.getInteger(SPLIT_PIPE_LEN_CONFIG, 10);
        this.idleInterval = configuration.getInteger(IDLE_INTERVAL_CONFIG, 10);
        LOG.info("idleInterval:" + this.idleInterval);
        LOG.info("splitPipeLen:" + this.splitPipeLen);
        runtimeContext.getMetricGroup().gauge(SourceMetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> {
            long longValue = INVALID.longValue();
            for (Long l : this.splitIdToEmitLag.values()) {
                if (l.longValue() > longValue) {
                    longValue = l.longValue();
                }
            }
            return Long.valueOf(longValue);
        });
        runtimeContext.getMetricGroup().gauge(SourceMetricNames.CURRENT_FETCH_EVENT_TIME_LAG, new DelayGauge(this.readerRunners, DelayKind.FETCHED_DELAY));
        runtimeContext.getMetricGroup().gauge(SourceMetricNames.SOURCE_IDLE_TIME, new DelayGauge(this.readerRunners, DelayKind.NO_DATA_DELAY));
        runtimeContext.getMetricGroup().gauge(SourceMetricNames.WATERMARK_LAG, new DelayGauge(this.readerRunners, DelayKind.WATERMARK_DELAY));
        this.tpsMetric = MetricUtils.registerNumRecordBatchesInRate(runtimeContext);
        this.tracingMetricEnabled = z;
        this.sampleInterval = j2;
        if (this.tracingMetricEnabled) {
            this.partitionLatency = new Latency(SourceMetricNames.SOURCE_PARTITION_LATENCY, runtimeContext.getMetricGroup());
            this.processLatency = new Latency(SourceMetricNames.SOURCE_PROCESS_LATENCY, runtimeContext.getMetricGroup());
            this.partitionCount = runtimeContext.getMetricGroup().gauge(SourceMetricNames.SOURCE_PARTITION_COUNT, new Gauge<Integer>() { // from class: com.alibaba.ververica.connectors.common.source.reader.ParallelReader.1
                /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                public Integer m1146getValue() {
                    int i = 0;
                    Iterator it = ParallelReader.this.readerRunners.iterator();
                    while (it.hasNext()) {
                        if (!((ReaderRunner) it.next()).finished) {
                            i++;
                        }
                    }
                    return Integer.valueOf(i);
                }
            });
        }
    }

    public ParallelReader<OUT, CURSOR> setExitAfterReadFinished(boolean z) {
        this.exitAfterReadFinished = z;
        return this;
    }

    public void addRecordReader(RecordReader<OUT, CURSOR> recordReader, InputSplit inputSplit, CURSOR cursor) throws IOException {
        this.readerRunners.add(new ReaderRunner<>(recordReader, inputSplit, cursor, this.context, this.splitPipeLen, this.idleInterval, this.tracingMetricEnabled, this.sampleInterval));
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        try {
            try {
                submitReaders();
                runWatermarkEmitter(sourceContext);
                runImpl(sourceContext);
                close();
            } catch (InterruptedException e) {
                LOG.error("ParallelReader was interrupted: ", (Throwable) e);
                close();
            } catch (Throwable th) {
                LOG.error("ParallelReader caught exception: ", th);
                throw new RuntimeException(th);
            }
        } catch (Throwable th2) {
            close();
            throw th2;
        }
    }

    private void runWatermarkEmitter(SourceFunction.SourceContext<OUT> sourceContext) {
        if (this.watermarkInterval > 0) {
            this.watermarkEmitter = new WatermarkEmitter<>(this, this.watermarkInterval, sourceContext);
            this.readerPool.submit(this.watermarkEmitter);
        }
    }

    public void stop() {
        this.stop = true;
    }

    public void close() {
        boolean z = false;
        try {
            try {
                Iterator it = this.readerRunners.iterator();
                while (it.hasNext()) {
                    ((ReaderRunner) it.next()).stop();
                }
                if (this.watermarkEmitter != null) {
                    this.watermarkEmitter.stop();
                }
                try {
                    Thread.sleep(100 * this.idleInterval);
                } catch (InterruptedException e) {
                    LOG.warn("Waiting for reader stopping is interrupted.", (Throwable) e);
                }
                this.readerPool.shutdownNow();
                z = this.readerPool.awaitTermination(STOP_WAITING, TimeUnit.SECONDS);
                if (!z) {
                    Iterator it2 = this.readerRunners.iterator();
                    while (it2.hasNext()) {
                        ReaderRunner readerRunner = (ReaderRunner) it2.next();
                        if (!readerRunner.isStopped()) {
                            LOG.info("Can not stop reader for split {}, it is stuck in method: \n {}.", readerRunner.getSplit(), readerRunner.getStackTrace());
                        }
                    }
                }
                if (!z) {
                    LOG.error("Shut down reader pool failed, exit process!");
                    System.exit(1);
                }
                LOG.info("Stopped all split reader.");
            } catch (Throwable th) {
                LOG.warn(th.toString());
                if (!z) {
                    LOG.error("Shut down reader pool failed, exit process!");
                    System.exit(1);
                }
                LOG.info("Stopped all split reader.");
            }
        } catch (Throwable th2) {
            if (!z) {
                LOG.error("Shut down reader pool failed, exit process!");
                System.exit(1);
            }
            LOG.info("Stopped all split reader.");
            throw th2;
        }
    }

    protected void runImpl(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        while (!this.stop && !this.readerRunners.isEmpty()) {
            Iterator it = this.readerRunners.iterator();
            boolean z = true;
            while (it.hasNext()) {
                ReaderRunner<OUT, CURSOR> readerRunner = (ReaderRunner) it.next();
                if (readerRunner.isStopped() && readerRunner.getCause() != null) {
                    LOG.error(String.format("SplitReader for split[%d][%s] failed, cause: %s", Integer.valueOf(readerRunner.getSplit().getSplitNumber()), readerRunner.getSplit().toString(), readerRunner.getCause()));
                    throw new RuntimeException(readerRunner.getCause());
                }
                if (readerRunner.isExhausted()) {
                    LOG.info(String.format("SplitReader for split[%d][%s] finished", Integer.valueOf(readerRunner.getSplit().getSplitNumber()), readerRunner.getSplit().toString()));
                    this.exitedReadRunnerSplitCursor.put(readerRunner.getSplit(), readerRunner.getProgress());
                    this.splitIdToEmitLag.remove(Integer.valueOf(readerRunner.getSplit().getSplitNumber()));
                    this.splitIdToSamplingCount.remove(Integer.valueOf(readerRunner.getSplit().getSplitNumber()));
                    it.remove();
                } else if (readerRunner.hasRecord()) {
                    z = false;
                    this.inputCount++;
                    if (this.tracingMetricEnabled && this.inputCount % this.sampleInterval == 0) {
                        long nanoTime = System.nanoTime();
                        processRecord(sourceContext, readerRunner);
                        this.processLatency.update(System.nanoTime() - nanoTime);
                    } else {
                        processRecord(sourceContext, readerRunner);
                    }
                }
            }
            if (z) {
                Thread.sleep(this.idleInterval);
            }
        }
        sourceContext.markAsTemporarilyIdle();
        LOG.info(String.format("This subTask [%d]/[%d] has finished, idle...", Integer.valueOf(this.context.getIndexOfThisSubtask()), Integer.valueOf(this.context.getNumberOfParallelSubtasks())));
        while (!this.stop && !this.exitAfterReadFinished) {
            Thread.sleep(1000L);
        }
    }

    private void processRecord(SourceFunction.SourceContext<OUT> sourceContext, ReaderRunner<OUT, CURSOR> readerRunner) {
        synchronized (sourceContext.getCheckpointLock()) {
            Tuple3<OUT, Long, Long> pollRecord = readerRunner.pollRecord();
            if (pollRecord != null) {
                sourceContext.collectWithTimestamp(pollRecord.f0, ((Long) pollRecord.f1).longValue());
                maybeSamplingMetric(readerRunner.getSplit().getSplitNumber(), ((Long) pollRecord.f1).longValue());
                this.tpsMetric.markEvent();
                if (((Long) pollRecord.f2).longValue() > 0) {
                    this.partitionLatency.update(((Long) pollRecord.f2).longValue());
                }
            }
        }
    }

    private void maybeSamplingMetric(int i, long j) {
        this.splitIdToSamplingCount.putIfAbsent(Integer.valueOf(i), Long.valueOf(this.sampleInterval));
        Long l = this.splitIdToSamplingCount.get(Integer.valueOf(i));
        if (l.longValue() + 1 < this.sampleInterval) {
            this.splitIdToSamplingCount.put(Integer.valueOf(i), Long.valueOf(l.longValue() + 1));
            return;
        }
        this.splitIdToEmitLag.put(Integer.valueOf(i), Long.valueOf(this.clock.absoluteTimeMillis() - j));
        this.splitIdToSamplingCount.put(Integer.valueOf(i), 0L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Progress<CURSOR> getProgress() throws IOException {
        Progress<CURSOR> progress = (Progress<CURSOR>) new Progress();
        Iterator it = this.readerRunners.iterator();
        while (it.hasNext()) {
            ReaderRunner readerRunner = (ReaderRunner) it.next();
            progress.addProgress(readerRunner.getSplit(), readerRunner.getProgress());
        }
        for (Map.Entry<InputSplit, CURSOR> entry : this.exitedReadRunnerSplitCursor.entrySet()) {
            progress.addProgress(entry.getKey(), entry.getValue());
        }
        return progress;
    }

    protected void submitReaders() {
        Iterator it = this.readerRunners.iterator();
        while (it.hasNext()) {
            ReaderRunner readerRunner = (ReaderRunner) it.next();
            this.readerPool.submit(readerRunner);
            LOG.info(String.format("Start split reader for split[%d][%s]", Integer.valueOf(readerRunner.getSplit().getSplitNumber()), readerRunner.getSplit().toString()));
        }
    }

    @Override // com.alibaba.ververica.connectors.common.source.WatermarkProvider
    public long getWatermark() {
        long j = Long.MAX_VALUE;
        Iterator it = this.readerRunners.iterator();
        while (it.hasNext()) {
            ReaderRunner readerRunner = (ReaderRunner) it.next();
            j = j < readerRunner.getWatermark() ? j : readerRunner.getWatermark();
        }
        return j;
    }
}
