package org.springframework.kafka.listener;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer.class */
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final ConsumerFactory<K, V> consumerFactory;
    private final TopicPartitionInitialOffset[] topicPartitions;
    private KafkaMessageListenerContainer<K, V>.ListenerConsumer listenerConsumer;
    private ListenableFuture<?> listenerConsumerFuture;
    private GenericMessageListener<?> listener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer.class */
    public final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekAware.ConsumerSeekCallback {
        private final Log logger;
        private final ContainerProperties containerProperties;
        private final OffsetCommitCallback commitCallback;
        private final Consumer<K, V> consumer;
        private final Map<String, Map<Integer, Long>> offsets;
        private final MessageListener<K, V> listener;
        private final BatchMessageListener<K, V> batchListener;
        private final ListenerType listenerType;
        private final boolean isConsumerAwareListener;
        private final boolean isBatchListener;
        private final boolean autoCommit;
        private final boolean isManualAck;
        private final boolean isManualImmediateAck;
        private final boolean isAnyManualAck;
        private final boolean isRecordAck;
        private final boolean isBatchAck;
        private final BlockingQueue<ConsumerRecord<K, V>> acks;
        private final BlockingQueue<TopicPartitionInitialOffset> seeks;
        private final ErrorHandler errorHandler;
        private final BatchErrorHandler batchErrorHandler;
        private volatile Map<TopicPartition, OffsetMetadata> definedPartitions;
        private volatile Collection<TopicPartition> assignedPartitions;
        private int count;
        private long last;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment.class */
        public final class ConsumerAcknowledgment implements Acknowledgment {
            private final ConsumerRecord<K, V> record;

            private ConsumerAcknowledgment(ConsumerRecord<K, V> consumerRecord) {
                this.record = consumerRecord;
            }

            @Override // org.springframework.kafka.support.Acknowledgment
            public void acknowledge() {
                Assert.state(ListenerConsumer.this.isAnyManualAck, "A manual ackmode is required for an acknowledging listener");
                ListenerConsumer.this.processAck(this.record);
            }

            public String toString() {
                return "Acknowledgment for " + this.record;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer$ConsumerBatchAcknowledgment.class */
        public final class ConsumerBatchAcknowledgment implements Acknowledgment {
            private final List<ConsumerRecord<K, V>> records;

            private ConsumerBatchAcknowledgment(List<ConsumerRecord<K, V>> list) {
                this.records = new LinkedList(list);
            }

            @Override // org.springframework.kafka.support.Acknowledgment
            public void acknowledge() {
                Assert.state(ListenerConsumer.this.isAnyManualAck, "A manual ackmode is required for an acknowledging listener");
                Iterator it = ListenerConsumer.this.getHighestOffsetRecords(this.records).iterator();
                while (it.hasNext()) {
                    ListenerConsumer.this.processAck((ConsumerRecord) it.next());
                }
            }

            public String toString() {
                return "Acknowledgment for " + this.records;
            }
        }

        private ListenerConsumer(GenericMessageListener<?> genericMessageListener, ListenerType listenerType) {
            this.logger = LogFactory.getLog(ListenerConsumer.class);
            this.containerProperties = KafkaMessageListenerContainer.this.getContainerProperties();
            this.commitCallback = this.containerProperties.getCommitCallback() != null ? this.containerProperties.getCommitCallback() : new LoggingCommitCallback();
            this.offsets = new HashMap();
            this.autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
            this.isManualAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL);
            this.isManualImmediateAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
            this.isRecordAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.RECORD);
            this.isBatchAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.BATCH);
            this.acks = new LinkedBlockingQueue();
            this.seeks = new LinkedBlockingQueue();
            Assert.state((this.isAnyManualAck && this.autoCommit) ? false : true, "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
            Consumer<K, V> createConsumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
            ConsumerRebalanceListener createRebalanceListener = createRebalanceListener(createConsumer);
            if (KafkaMessageListenerContainer.this.topicPartitions != null) {
                List<TopicPartitionInitialOffset> asList = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = new HashMap(asList.size());
                for (TopicPartitionInitialOffset topicPartitionInitialOffset : asList) {
                    this.definedPartitions.put(topicPartitionInitialOffset.topicPartition(), new OffsetMetadata(topicPartitionInitialOffset.initialOffset(), topicPartitionInitialOffset.isRelativeToCurrent()));
                }
                createConsumer.assign(new ArrayList(this.definedPartitions.keySet()));
            } else if (this.containerProperties.getTopicPattern() != null) {
                createConsumer.subscribe(this.containerProperties.getTopicPattern(), createRebalanceListener);
            } else {
                createConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), createRebalanceListener);
            }
            this.consumer = createConsumer;
            GenericErrorHandler<?> genericErrorHandler = this.containerProperties.getGenericErrorHandler();
            if (genericMessageListener instanceof BatchMessageListener) {
                this.listener = null;
                this.batchListener = (BatchMessageListener) genericMessageListener;
                this.isBatchListener = true;
            } else {
                if (!(genericMessageListener instanceof MessageListener)) {
                    throw new IllegalArgumentException("Listener must be one of 'MessageListener', 'BatchMessageListener', or the variants that are consumer aware and/or Acknowledging not " + genericMessageListener.getClass().getName());
                }
                this.listener = (MessageListener) genericMessageListener;
                this.batchListener = null;
                this.isBatchListener = false;
            }
            this.listenerType = listenerType;
            this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals(ListenerType.CONSUMER_AWARE);
            if (this.isBatchListener) {
                validateErrorHandler(true);
                this.errorHandler = new LoggingErrorHandler();
                this.batchErrorHandler = genericErrorHandler == null ? new BatchLoggingErrorHandler() : (BatchErrorHandler) genericErrorHandler;
            } else {
                validateErrorHandler(false);
                this.errorHandler = genericErrorHandler == null ? new LoggingErrorHandler() : (ErrorHandler) genericErrorHandler;
                this.batchErrorHandler = new BatchLoggingErrorHandler();
            }
            Assert.state((this.isBatchListener && this.isRecordAck) ? false : true, "Cannot use AckMode.RECORD with a batch listener");
        }

        public ConsumerRebalanceListener createRebalanceListener(final Consumer<K, V> consumer) {
            return new ConsumerRebalanceListener() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.1
                final ConsumerRebalanceListener userListener;
                final ConsumerAwareRebalanceListener consumerAwareListener;

                {
                    this.userListener = KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener();
                    this.consumerAwareListener = this.userListener instanceof ConsumerAwareRebalanceListener ? (ConsumerAwareRebalanceListener) this.userListener : null;
                }

                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedBeforeCommit(consumer, collection);
                    } else {
                        this.userListener.onPartitionsRevoked(collection);
                    }
                    ListenerConsumer.this.commitPendingAcks();
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedAfterCommit(consumer, collection);
                    }
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    ListenerConsumer.this.assignedPartitions = collection;
                    if (!ListenerConsumer.this.autoCommit) {
                        HashMap hashMap = new HashMap();
                        for (TopicPartition topicPartition : collection) {
                            hashMap.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition)));
                        }
                        if (ListenerConsumer.this.logger.isDebugEnabled()) {
                            ListenerConsumer.this.logger.debug("Committing on assignment: " + hashMap);
                        }
                        if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
                            ListenerConsumer.this.consumer.commitSync(hashMap);
                        } else {
                            ListenerConsumer.this.consumer.commitAsync(hashMap, KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
                        }
                    }
                    if (ListenerConsumer.this.listener instanceof ConsumerSeekAware) {
                        ListenerConsumer.this.seekPartitions(collection, false);
                    }
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsAssigned(consumer, collection);
                    } else {
                        this.userListener.onPartitionsAssigned(collection);
                    }
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void seekPartitions(Collection<TopicPartition> collection, boolean z) {
            Map<TopicPartition, Long> hashMap = new HashMap<>();
            for (TopicPartition topicPartition : collection) {
                hashMap.put(topicPartition, Long.valueOf(this.consumer.position(topicPartition)));
            }
            ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback = new ConsumerSeekAware.ConsumerSeekCallback() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.2
                @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
                public void seek(String str, int i, long j) {
                    ListenerConsumer.this.consumer.seek(new TopicPartition(str, i), j);
                }

                @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
                public void seekToBeginning(String str, int i) {
                    ListenerConsumer.this.consumer.seekToBeginning(Collections.singletonList(new TopicPartition(str, i)));
                }

                @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
                public void seekToEnd(String str, int i) {
                    ListenerConsumer.this.consumer.seekToEnd(Collections.singletonList(new TopicPartition(str, i)));
                }
            };
            if (z) {
                ((ConsumerSeekAware) this.listener).onIdleContainer(hashMap, consumerSeekCallback);
            } else {
                ((ConsumerSeekAware) this.listener).onPartitionsAssigned(hashMap, consumerSeekCallback);
            }
        }

        private void validateErrorHandler(boolean z) {
            GenericErrorHandler<?> genericErrorHandler = this.containerProperties.getGenericErrorHandler();
            if (this.errorHandler == null) {
                return;
            }
            Type[] genericInterfaces = genericErrorHandler.getClass().getGenericInterfaces();
            boolean z2 = false;
            int length = genericInterfaces.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Type type = genericInterfaces[i];
                if (type.equals(ErrorHandler.class)) {
                    z2 = !z;
                } else {
                    if (type.equals(BatchErrorHandler.class)) {
                        z2 = z;
                        break;
                    }
                    i++;
                }
            }
            Assert.state(z2, "Error handler is not compatible with the message listener, expecting an instance of " + (z ? "BatchErrorHandler" : "ErrorHandler") + " not " + genericErrorHandler.getClass().getName());
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            if (this.listener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware) this.listener).registerSeekCallback(this);
            }
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                initPartitionsIfNeeded();
            }
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis;
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    if (!this.autoCommit) {
                        processCommits();
                    }
                    processSeeks();
                    ConsumerRecords<K, V> poll = this.consumer.poll(this.containerProperties.getPollTimeout());
                    if (poll != null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Received: " + poll.count() + " records");
                    }
                    if (poll != null && poll.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            currentTimeMillis = System.currentTimeMillis();
                        }
                        invokeListener(poll);
                    } else if (this.containerProperties.getIdleEventInterval() != null) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if (currentTimeMillis2 > currentTimeMillis + this.containerProperties.getIdleEventInterval().longValue() && currentTimeMillis2 > j + this.containerProperties.getIdleEventInterval().longValue()) {
                            KafkaMessageListenerContainer.this.publishIdleContainerEvent(currentTimeMillis2 - currentTimeMillis, this.isConsumerAwareListener ? this.consumer : null);
                            j = currentTimeMillis2;
                            if (this.listener instanceof ConsumerSeekAware) {
                                seekPartitions(KafkaMessageListenerContainer.this.getAssignedPartitions(), true);
                            }
                        }
                    }
                } catch (WakeupException e) {
                } catch (Exception e2) {
                    if (this.containerProperties.getGenericErrorHandler() != null) {
                        this.containerProperties.getGenericErrorHandler().handle(e2, null);
                    } else {
                        this.logger.error("Container exception", e2);
                    }
                }
            }
            commitPendingAcks();
            try {
                this.consumer.unsubscribe();
            } catch (WakeupException e3) {
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Consumer stopped");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void commitPendingAcks() {
            processCommits();
            if (this.offsets.size() > 0) {
                commitIfNecessary();
            }
        }

        private void handleAcks() {
            ConsumerRecord<K, V> poll = this.acks.poll();
            while (true) {
                ConsumerRecord<K, V> consumerRecord = poll;
                if (consumerRecord == null) {
                    return;
                }
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Ack: " + consumerRecord);
                }
                processAck(consumerRecord);
                poll = this.acks.poll();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void processAck(ConsumerRecord<K, V> consumerRecord) {
            if (!this.isManualImmediateAck) {
                addOffset(consumerRecord);
            } else {
                try {
                    ackImmediate(consumerRecord);
                } catch (WakeupException e) {
                }
            }
        }

        private void ackImmediate(ConsumerRecord<K, V> consumerRecord) {
            Map singletonMap = Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Committing: " + singletonMap);
            }
            if (this.containerProperties.isSyncCommits()) {
                this.consumer.commitSync(singletonMap);
            } else {
                this.consumer.commitAsync(singletonMap, this.commitCallback);
            }
        }

        private void invokeListener(ConsumerRecords<K, V> consumerRecords) {
            if (this.isBatchListener) {
                invokeBatchListener(consumerRecords);
            } else {
                invokeRecordListener(consumerRecords);
            }
        }

        private void invokeBatchListener(ConsumerRecords<K, V> consumerRecords) {
            LinkedList linkedList = new LinkedList();
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                linkedList.add(it.next());
            }
            if (linkedList.size() > 0) {
                try {
                    switch (this.listenerType) {
                        case ACKNOWLEDGING_CONSUMER_AWARE:
                            this.batchListener.onMessage(linkedList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(linkedList) : null, this.consumer);
                            break;
                        case ACKNOWLEDGING:
                            this.batchListener.onMessage((BatchMessageListener<K, V>) linkedList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(linkedList) : null);
                            break;
                        case CONSUMER_AWARE:
                            this.batchListener.onMessage((BatchMessageListener<K, V>) linkedList, (Consumer<?, ?>) this.consumer);
                            break;
                        case SIMPLE:
                            this.batchListener.onMessage(linkedList);
                            break;
                    }
                    if (!this.isAnyManualAck && !this.autoCommit) {
                        Iterator<ConsumerRecord<K, V>> it2 = getHighestOffsetRecords(linkedList).iterator();
                        while (it2.hasNext()) {
                            this.acks.put(it2.next());
                        }
                    }
                } catch (Exception e) {
                    if (this.containerProperties.isAckOnError() && !this.autoCommit) {
                        Iterator<ConsumerRecord<K, V>> it3 = getHighestOffsetRecords(linkedList).iterator();
                        while (it3.hasNext()) {
                            this.acks.add(it3.next());
                        }
                    }
                    try {
                        this.batchErrorHandler.handle(e, consumerRecords);
                    } catch (Error e2) {
                        this.logger.error("Error handler threw an error", e2);
                        throw e2;
                    } catch (Exception e3) {
                        this.logger.error("Error handler threw an exception", e3);
                    }
                }
            }
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:9:0x004b. Please report as an issue. */
        /* JADX WARN: Removed duplicated region for block: B:18:0x00d2 A[Catch: Exception -> 0x00e7, TryCatch #0 {Exception -> 0x00e7, blocks: (B:8:0x0040, B:9:0x004b, B:10:0x0068, B:14:0x0074, B:15:0x0082, B:26:0x008e, B:28:0x009f, B:30:0x00ab, B:31:0x00b9, B:34:0x00c1, B:16:0x00cb, B:18:0x00d2, B:20:0x00d9), top: B:7:0x0040 }] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private void invokeRecordListener(org.apache.kafka.clients.consumer.ConsumerRecords<K, V> r9) {
            /*
                Method dump skipped, instructions count: 316
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.invokeRecordListener(org.apache.kafka.clients.consumer.ConsumerRecords):void");
        }

        private void processCommits() {
            handleAcks();
            this.count += this.acks.size();
            AbstractMessageListenerContainer.AckMode ackMode = this.containerProperties.getAckMode();
            if (this.isManualImmediateAck) {
                return;
            }
            if (!this.isManualAck) {
                updatePendingOffsets();
            }
            boolean z = this.count >= this.containerProperties.getAckCount();
            if (this.isManualAck || this.isBatchAck || this.isRecordAck || (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT) && z)) {
                if (this.logger.isDebugEnabled() && ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT)) {
                    this.logger.debug("Committing in AckMode.COUNT because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount());
                }
                commitIfNecessary();
                this.count = 0;
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z2 = currentTimeMillis - this.last > this.containerProperties.getAckTime();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.TIME) && z2) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Committing in AckMode.TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                }
                commitIfNecessary();
                this.last = currentTimeMillis;
                return;
            }
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) {
                if (z2 || z) {
                    if (this.logger.isDebugEnabled()) {
                        if (z2) {
                            this.logger.debug("Committing in AckMode.COUNT_TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                        } else {
                            this.logger.debug("Committing in AckMode.COUNT_TIME because count " + this.count + " exceeds configured limit of" + this.containerProperties.getAckCount());
                        }
                    }
                    commitIfNecessary();
                    this.last = currentTimeMillis;
                    this.count = 0;
                }
            }
        }

        private void processSeeks() {
            TopicPartitionInitialOffset poll = this.seeks.poll();
            while (true) {
                TopicPartitionInitialOffset topicPartitionInitialOffset = poll;
                if (topicPartitionInitialOffset == null) {
                    return;
                }
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Seek: " + topicPartitionInitialOffset);
                }
                try {
                    TopicPartitionInitialOffset.SeekPosition position = topicPartitionInitialOffset.getPosition();
                    if (position == null) {
                        this.consumer.seek(topicPartitionInitialOffset.topicPartition(), topicPartitionInitialOffset.initialOffset().longValue());
                    } else if (position.equals(TopicPartitionInitialOffset.SeekPosition.BEGINNING)) {
                        this.consumer.seekToBeginning(Collections.singletonList(topicPartitionInitialOffset.topicPartition()));
                    } else {
                        this.consumer.seekToEnd(Collections.singletonList(topicPartitionInitialOffset.topicPartition()));
                    }
                } catch (Exception e) {
                    this.logger.error("Exception while seeking " + topicPartitionInitialOffset, e);
                }
                poll = this.seeks.poll();
            }
        }

        private void initPartitionsIfNeeded() {
            for (Map.Entry<TopicPartition, OffsetMetadata> entry : this.definedPartitions.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetMetadata value = entry.getValue();
                Long l = value.offset;
                if (l != null) {
                    long longValue = l.longValue();
                    if (l.longValue() < 0) {
                        if (!value.relativeToCurrent) {
                            this.consumer.seekToEnd(Arrays.asList(key));
                        }
                        longValue = Math.max(0L, this.consumer.position(key) + l.longValue());
                    } else if (value.relativeToCurrent) {
                        longValue = this.consumer.position(key) + l.longValue();
                    }
                    try {
                        this.consumer.seek(key, longValue);
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Reset " + key + " to offset " + longValue);
                        }
                    } catch (Exception e) {
                        this.logger.error("Failed to set initial offset for " + key + " at " + longValue + ". Position is " + this.consumer.position(key), e);
                    }
                }
            }
        }

        private void updatePendingOffsets() {
            ConsumerRecord<K, V> poll = this.acks.poll();
            while (true) {
                ConsumerRecord<K, V> consumerRecord = poll;
                if (consumerRecord == null) {
                    return;
                }
                addOffset(consumerRecord);
                poll = this.acks.poll();
            }
        }

        private void addOffset(ConsumerRecord<K, V> consumerRecord) {
            this.offsets.computeIfAbsent(consumerRecord.topic(), str -> {
                return new HashMap();
            }).compute(Integer.valueOf(consumerRecord.partition()), (num, l) -> {
                return Long.valueOf(l == null ? consumerRecord.offset() : Math.max(l.longValue(), consumerRecord.offset()));
            });
        }

        private void commitIfNecessary() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Map.Entry<Integer, Long> entry2 : entry.getValue().entrySet()) {
                    hashMap.put(new TopicPartition(entry.getKey(), entry2.getKey().intValue()), new OffsetAndMetadata(entry2.getValue().longValue() + 1));
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Commit list: " + hashMap);
            }
            if (hashMap.isEmpty()) {
                return;
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Committing: " + hashMap);
            }
            try {
                if (this.containerProperties.isSyncCommits()) {
                    this.consumer.commitSync(hashMap);
                } else {
                    this.consumer.commitAsync(hashMap, this.commitCallback);
                }
            } catch (WakeupException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Woken up during commit");
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(List<ConsumerRecord<K, V>> list) {
            HashMap hashMap = new HashMap();
            list.forEach(consumerRecord -> {
                hashMap.compute(Integer.valueOf(consumerRecord.partition()), (num, consumerRecord) -> {
                    if (consumerRecord != null && consumerRecord.offset() <= consumerRecord.offset()) {
                        return consumerRecord;
                    }
                    return consumerRecord;
                });
            });
            return hashMap.values();
        }

        @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
        public void seek(String str, int i, long j) {
            this.seeks.add(new TopicPartitionInitialOffset(str, i, Long.valueOf(j)));
        }

        @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
        public void seekToBeginning(String str, int i) {
            this.seeks.add(new TopicPartitionInitialOffset(str, i, TopicPartitionInitialOffset.SeekPosition.BEGINNING));
        }

        @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
        public void seekToEnd(String str, int i) {
            this.seeks.add(new TopicPartitionInitialOffset(str, i, TopicPartitionInitialOffset.SeekPosition.END));
        }
    }

    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$LoggingCommitCallback.class */
    private static final class LoggingCommitCallback implements OffsetCommitCallback {
        private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class);

        private LoggingCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            if (exc != null) {
                logger.error("Commit failed for " + map, exc);
            } else if (logger.isDebugEnabled()) {
                logger.debug("Commits for " + map + " completed");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$OffsetMetadata.class */
    public static final class OffsetMetadata {
        private final Long offset;
        private final boolean relativeToCurrent;

        private OffsetMetadata(Long l, boolean z) {
            this.offset = l;
            this.relativeToCurrent = z;
        }
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        this(consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitionInitialOffsetArr) {
        super(containerProperties);
        Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
        this.consumerFactory = consumerFactory;
        if (topicPartitionInitialOffsetArr != null) {
            this.topicPartitions = (TopicPartitionInitialOffset[]) Arrays.copyOf(topicPartitionInitialOffsetArr, topicPartitionInitialOffsetArr.length);
        } else {
            this.topicPartitions = containerProperties.getTopicPartitions();
        }
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        if (((ListenerConsumer) this.listenerConsumer).definedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) this.listenerConsumer).definedPartitions.keySet());
        }
        if (((ListenerConsumer) this.listenerConsumer).assignedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) this.listenerConsumer).assignedPartitions);
        }
        return null;
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStart() {
        Object obj;
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();
        if (!this.consumerFactory.isAutoCommit()) {
            AbstractMessageListenerContainer.AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT) || ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AbstractMessageListenerContainer.AckMode.TIME) || ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000L);
            }
        }
        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (containerProperties.getConsumerTaskExecutor() == null) {
            containerProperties.setConsumerTaskExecutor(new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-"));
        }
        Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
        this.listener = (GenericMessageListener) messageListener;
        ListenerType determineListenerType = ListenerUtils.determineListenerType(this.listener);
        if (this.listener instanceof DelegatingMessageListener) {
            Object obj2 = this.listener;
            while (true) {
                obj = obj2;
                if (!(obj instanceof DelegatingMessageListener)) {
                    break;
                } else {
                    obj2 = ((DelegatingMessageListener) obj).getDelegate();
                }
            }
            determineListenerType = ListenerUtils.determineListenerType(obj);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, determineListenerType);
        setRunning(true);
        this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStop(final Runnable runnable) {
        if (isRunning()) {
            this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.1
                public void onFailure(Throwable th) {
                    KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", th);
                    if (runnable != null) {
                        runnable.run();
                    }
                }

                public void onSuccess(Object obj) {
                    if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {
                        KafkaMessageListenerContainer.this.logger.debug(KafkaMessageListenerContainer.this + " stopped normally");
                    }
                    if (runnable != null) {
                        runnable.run();
                    }
                }
            });
            setRunning(false);
            ((ListenerConsumer) this.listenerConsumer).consumer.wakeup();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishIdleContainerEvent(long j, Consumer<?, ?> consumer) {
        if (getApplicationEventPublisher() != null) {
            getApplicationEventPublisher().publishEvent(new ListenerContainerIdleEvent(this, j, getBeanName(), getAssignedPartitions(), consumer));
        }
    }

    public String toString() {
        return "KafkaMessageListenerContainer [id=" + getBeanName() + ", topicPartitions=" + getAssignedPartitions() + "]";
    }
}
