package shaded.com.aliyun.datahub.clientlibrary.consumer;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.aliyun.datahub.client.exception.DatahubClientException;
import shaded.com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import shaded.com.aliyun.datahub.client.model.SubscriptionOffset;
import shaded.com.aliyun.datahub.clientlibrary.common.BackEndTask;
import shaded.com.aliyun.datahub.clientlibrary.common.ClientProvider;
import shaded.com.aliyun.datahub.clientlibrary.common.Constants;
import shaded.com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import shaded.com.aliyun.datahub.clientlibrary.exception.ClientException;
import shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import shaded.com.aliyun.datahub.clientlibrary.models.Assignment;
import shaded.com.aliyun.datahub.clientlibrary.models.Offset;
import shaded.com.aliyun.datahub.clientlibrary.models.OffsetManager;
import shaded.com.aliyun.datahub.clientlibrary.models.OffsetWrapper;
import shaded.com.aliyun.datahub.clientlibrary.models.RecordKeyImpl;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:shaded/com/aliyun/datahub/clientlibrary/consumer/OffsetCoordinator.class */
public class OffsetCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) OffsetCoordinator.class);
    private static final long MIN_ASYNC_COMMIT_INTERVAL_MS = 3000;
    private String projectName;
    private String topicName;
    private String subId;
    private String consumerId;
    private ConsumerConfig config;
    private ClientProvider clientProvider;
    private CheckpointTask checkpointTask;
    private ShardCoordinator shardCoordinator = null;
    private volatile long lastCommitTime = -1;
    private volatile long lastLogTime = -1;
    private final OffsetManager offsetManager = new OffsetManager();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Object commitLock = new Object();
    private final Object updateLock = new Object();
    private CommitTask commitTask = new CommitTask();

    /* loaded from: input_file:shaded/com/aliyun/datahub/clientlibrary/consumer/OffsetCoordinator$CheckpointTask.class */
    private class CheckpointTask extends BackEndTask {
        CheckpointTask() {
            this.taskName = "checkpoint-task";
        }

        @Override // shaded.com.aliyun.datahub.clientlibrary.common.BackEndTask
        protected void run() {
            OffsetCoordinator.LOG.info("Backend Checkpoint task start, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId);
            while (isRunning()) {
                try {
                    OffsetCoordinator.this.offsetManager.getHeldOffsets().entrySet().parallelStream().forEach(entry -> {
                        OffsetCoordinator.this.flushAckedOffset((String) entry.getKey(), (OffsetWrapper) entry.getValue(), this::isRunning);
                    });
                    OffsetCoordinator.this.offsetManager.getReleasingOffsets().entrySet().parallelStream().forEach(entry2 -> {
                        OffsetCoordinator.this.flushAckedOffset((String) entry2.getKey(), (OffsetWrapper) entry2.getValue(), this::isRunning);
                    });
                    if (OffsetCoordinator.this.offsetManager.hasShardToRelease()) {
                        OffsetCoordinator.this.commitTask.triggerUpdate();
                    }
                    waitSignal(1000L);
                } catch (Throwable th) {
                    OffsetCoordinator.LOG.error("Check failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, th);
                }
            }
            OffsetCoordinator.LOG.info("Backend Checkpoint task stop, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId);
        }
    }

    /* loaded from: input_file:shaded/com/aliyun/datahub/clientlibrary/consumer/OffsetCoordinator$CommitTask.class */
    private class CommitTask extends BackEndTask {
        private volatile DatahubClientException exception = null;
        private int retryTime = 0;

        CommitTask() {
            this.taskName = "commit-task";
        }

        boolean checkRunning() {
            if (this.exception == null) {
                return !isStopped();
            }
            DatahubClientException datahubClientException = this.exception;
            this.exception = null;
            throw datahubClientException;
        }

        @Override // shaded.com.aliyun.datahub.clientlibrary.common.BackEndTask
        protected void run() {
            OffsetCoordinator.LOG.info("Backend Commit task start, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, IntervalMs: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, Long.valueOf(OffsetCoordinator.this.config.getOffsetCommitTimeoutMs()));
            while (true) {
                if (!isRunning()) {
                    break;
                }
                long j = 3000;
                try {
                    if (OffsetCoordinator.this.commitReleased() && OffsetCoordinator.this.commit(false)) {
                        j = OffsetCoordinator.this.config.getOffsetCommitTimeoutMs();
                    }
                    this.retryTime = 0;
                } catch (DatahubClientException e) {
                    if (ExceptionRetryer.canRetry(e) && this.retryTime < 3) {
                        this.retryTime++;
                        OffsetCoordinator.LOG.info("Committed failed, try next time, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, e);
                    } else if (!ExceptionRetryer.canSwallow(e)) {
                        this.exception = e;
                        OffsetCoordinator.LOG.error("Committed failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, e);
                    }
                } catch (Throwable th) {
                    OffsetCoordinator.LOG.error("Committed failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, th);
                    this.exception = new ClientException(th.getMessage());
                }
                if (this.exception != null) {
                    stop();
                    break;
                }
                waitSignal(j, OffsetCoordinator.MIN_ASYNC_COMMIT_INTERVAL_MS);
            }
            OffsetCoordinator.LOG.info("Backend Commit task stop, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:shaded/com/aliyun/datahub/clientlibrary/consumer/OffsetCoordinator$LoopFlag.class */
    public interface LoopFlag {
        boolean proceed();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OffsetCoordinator(String str, String str2, String str3, ConsumerConfig consumerConfig, String str4) {
        this.config = consumerConfig;
        this.projectName = str;
        this.topicName = str2;
        this.subId = str3;
        this.consumerId = str4;
        this.clientProvider = consumerConfig.getClientProvider();
        this.commitTask.start();
        if (this.config.isAutoCommit()) {
            return;
        }
        this.checkpointTask = new CheckpointTask();
        this.checkpointTask.start();
    }

    public void setShardCoordinator(ShardCoordinator shardCoordinator) {
        this.shardCoordinator = shardCoordinator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkCommitTask() {
        if (this.commitTask.checkRunning()) {
            return;
        }
        this.commitTask = new CommitTask();
        this.commitTask.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void check(String str, RecordKeyImpl recordKeyImpl) {
        if (this.config.isAutoCommit()) {
            return;
        }
        OffsetWrapper heldOffset = this.offsetManager.getHeldOffset(str);
        if (heldOffset == null) {
            LOG.error("setOffset failed, invalid status, offset not match shard assigned, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, ShardId: {}", this.projectName, this.topicName, this.subId, this.consumerId, str);
            throw new ClientException("Invalid status, offset not match shard assigned");
        }
        heldOffset.getQueue().add(recordKeyImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ack(String str, RecordKeyImpl recordKeyImpl) {
        if (this.config.isAutoCommit()) {
            updateOffset(str, recordKeyImpl.getOffset());
        }
    }

    void updateOffset(String str, SubscriptionOffset subscriptionOffset) {
        OffsetWrapper heldOffset = this.offsetManager.getHeldOffset(str);
        if (heldOffset != null) {
            heldOffset.updateAckedOffset(subscriptionOffset);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Offset> openAndGetOffsets(List<String> list) {
        HashMap hashMap = new HashMap();
        List<String> list2 = (List) list.stream().filter(str -> {
            return !this.offsetManager.getHeldOffsets().keySet().contains(str);
        }).collect(Collectors.toList());
        if (!list2.isEmpty()) {
            Map<String, SubscriptionOffset> openSubscriptionSession = openSubscriptionSession(list2);
            synchronized (this.updateLock) {
                this.offsetManager.addOffsets(openSubscriptionSession);
            }
        }
        for (String str2 : list) {
            hashMap.put(str2, getNextOffset(this.offsetManager.getHeldOffset(str2).getAckedOffset()));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseOffsets(Assignment assignment) {
        List<String> releaseShardList = assignment.getReleaseShardList(this.offsetManager.getHeldOffsets().keySet());
        if (releaseShardList.isEmpty()) {
            return;
        }
        synchronized (this.updateLock) {
            this.offsetManager.release(releaseShardList);
        }
        LOG.info("start releasing offsets, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", this.projectName, this.topicName, this.subId, this.consumerId, releaseShardList.toString());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeAllOffsets() {
        synchronized (this.updateLock) {
            this.offsetManager.removeAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<String> getReadEndShardList(Map<String, Long> map) {
        if (map.isEmpty()) {
            return Collections.emptyList();
        }
        List<String> readEndShardList = this.offsetManager.getReadEndShardList(map);
        if (readEndShardList.size() < map.size()) {
            this.commitTask.triggerUpdate();
        }
        return readEndShardList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setConsumerId(String str) {
        this.consumerId = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getLastCommitTime() {
        return this.lastCommitTime;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (this.commitTask != null) {
            this.commitTask.stop();
        }
        if (this.checkpointTask != null) {
            this.checkpointTask.stop();
            if (this.checkpointTask.waitStopped(Constants.CLOSE_TIMEOUT_MS)) {
                this.offsetManager.getHeldOffsets().entrySet().parallelStream().forEach(entry -> {
                    flushAckedOffset((String) entry.getKey(), (OffsetWrapper) entry.getValue(), null);
                });
                this.offsetManager.getReleasingOffsets().entrySet().parallelStream().forEach(entry2 -> {
                    flushAckedOffset((String) entry2.getKey(), (OffsetWrapper) entry2.getValue(), null);
                });
            }
        }
        commitReleased();
        try {
            commit(true);
        } catch (DatahubClientException e) {
            LOG.warn("Commit offset by `close` failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", this.projectName, this.topicName, this.subId, this.consumerId, e);
        }
    }

    private Map<String, SubscriptionOffset> openSubscriptionSession(final List<String> list) {
        return new ExceptionRetryer<Map<String, SubscriptionOffset>>() { // from class: shaded.com.aliyun.datahub.clientlibrary.consumer.OffsetCoordinator.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            public Map<String, SubscriptionOffset> func() {
                return OffsetCoordinator.this.clientProvider.getClient().openSubscriptionSession(OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, list).getOffsets();
            }

            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            protected void onExceedRetryLimit(DatahubClientException datahubClientException) {
                OffsetCoordinator.LOG.error("Init offset failed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}", OffsetCoordinator.this.projectName, OffsetCoordinator.this.topicName, OffsetCoordinator.this.subId, OffsetCoordinator.this.consumerId, datahubClientException);
            }

            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            protected boolean isTerminated() {
                return OffsetCoordinator.this.closed.get();
            }
        }.run(3, 1000L);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean commit(boolean z) {
        Map<String, SubscriptionOffset> ackedOffsets;
        boolean commit;
        synchronized (this.commitLock) {
            synchronized (this.updateLock) {
                ackedOffsets = this.offsetManager.getAckedOffsets();
            }
            try {
                commit = commit(ackedOffsets, z);
            } catch (SubscriptionSessionInvalidException e) {
                synchronized (this.updateLock) {
                    if (this.shardCoordinator == null || this.shardCoordinator.getAssignment().getShards().containsAll(ackedOffsets.keySet())) {
                        throw e;
                    }
                    LOG.info("Offset session changed for removed shard, will retry, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", this.projectName, this.topicName, this.subId, this.consumerId, ackedOffsets.keySet().toString());
                    releaseOffsets(this.shardCoordinator.getAssignment());
                    return false;
                }
            }
        }
        return commit;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean commitReleased() {
        Map<String, SubscriptionOffset> offsetsToRelease;
        synchronized (this.commitLock) {
            synchronized (this.updateLock) {
                offsetsToRelease = this.offsetManager.getOffsetsToRelease();
            }
            if (offsetsToRelease.isEmpty()) {
                return true;
            }
            try {
                commit(offsetsToRelease, false);
                List<String> releasedShardList = this.offsetManager.getReleasedShardList();
                if (!releasedShardList.isEmpty()) {
                    LOG.info("Offset released, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", this.projectName, this.topicName, this.subId, this.consumerId, releasedShardList.toString());
                }
            } catch (DatahubClientException e) {
                if (!ExceptionRetryer.canSwallow(e)) {
                    LOG.info("Remove releasing offset, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", this.projectName, this.topicName, this.subId, this.consumerId, offsetsToRelease.keySet().toString(), e);
                    this.offsetManager.removeReleased(offsetsToRelease.keySet());
                }
            } catch (Throwable th) {
                LOG.info("Commit releasing offset fail, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Shards: {}", this.projectName, this.topicName, this.subId, this.consumerId, offsetsToRelease.keySet().toString(), th);
            }
            return offsetsToRelease.isEmpty();
        }
    }

    private boolean commit(Map<String, SubscriptionOffset> map, boolean z) {
        if (map == null || map.isEmpty()) {
            return false;
        }
        this.clientProvider.getClient().commitSubscriptionOffset(this.projectName, this.topicName, this.subId, map);
        synchronized (this.updateLock) {
            this.offsetManager.setCommittedOffsets(map);
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (z || currentTimeMillis - this.lastLogTime > 60000) {
            synchronized (this.updateLock) {
                LOG.info("Committed, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, Offset: {}", this.projectName, this.topicName, this.subId, this.consumerId, this.offsetManager.getCommittedOffsets().toString());
            }
            this.lastLogTime = currentTimeMillis;
        }
        this.lastCommitTime = currentTimeMillis;
        return true;
    }

    private Offset getNextOffset(SubscriptionOffset subscriptionOffset) {
        long sequence = subscriptionOffset.getSequence();
        long timestamp = subscriptionOffset.getTimestamp();
        if (timestamp <= 0) {
            timestamp = 0;
            sequence = 0;
        } else if (sequence > -1) {
            sequence++;
        }
        return new Offset(sequence, timestamp);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushAckedOffset(String str, OffsetWrapper offsetWrapper, LoopFlag loopFlag) {
        RecordKeyImpl peek;
        if (offsetWrapper == null) {
            return;
        }
        synchronized (offsetWrapper.getCheckLock()) {
            ConcurrentLinkedQueue<RecordKeyImpl> queue = offsetWrapper.getQueue();
            RecordKeyImpl recordKeyImpl = null;
            while (true) {
                if ((loopFlag != null && !loopFlag.proceed()) || (peek = queue.peek()) == null) {
                    break;
                }
                if (peek.isReady()) {
                    recordKeyImpl = peek;
                    queue.poll();
                } else {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (peek.getOffset().getSequence() == offsetWrapper.getLastUnReadySeq()) {
                        long lastCheckTime = currentTimeMillis - offsetWrapper.getLastCheckTime();
                        if (lastCheckTime > 60000 * (offsetWrapper.getWarningTimes().get() + 1)) {
                            LOG.warn("Record is not ack for {}ms, Project: {}, Topic: {}, SubId: {}, ConsumerId: {}, ShardId: {}, Seq: {}", Long.valueOf(lastCheckTime), this.projectName, this.topicName, this.subId, this.consumerId, str, Long.valueOf(peek.getOffset().getSequence()));
                            offsetWrapper.getWarningTimes().incrementAndGet();
                        }
                    } else {
                        offsetWrapper.setLastUnReadySeq(peek.getOffset().getSequence());
                        offsetWrapper.setLastCheckTime(currentTimeMillis);
                        offsetWrapper.getWarningTimes().set(0L);
                    }
                }
            }
            if (recordKeyImpl != null) {
                offsetWrapper.updateAckedOffset(recordKeyImpl.getOffset());
            }
        }
    }
}
