/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.springframework.amqp.AmqpApplicationContextClosedException;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.ActiveObjectCounter;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.BackOffExecution;

public class DirectMessageListenerContainer
extends AbstractMessageListenerContainer {
    private static final int DEFAULT_MONITOR_INTERVAL = 10000;
    private static final int DEFAULT_ACK_TIMEOUT = 20000;
    protected final List<SimpleConsumer> consumers = new LinkedList<SimpleConsumer>();
    private final List<SimpleConsumer> consumersToRestart = new LinkedList<SimpleConsumer>();
    private final MultiValueMap<String, SimpleConsumer> consumersByQueue = new LinkedMultiValueMap();
    private final ActiveObjectCounter<SimpleConsumer> cancellationLock = new ActiveObjectCounter();
    private TaskScheduler taskScheduler;
    private boolean taskSchedulerSet;
    private long monitorInterval = 10000L;
    private int messagesPerAck;
    private long ackTimeout = 20000L;
    private volatile boolean started;
    private volatile boolean aborted;
    private volatile boolean hasStopped;
    private volatile CountDownLatch startedLatch = new CountDownLatch(1);
    private volatile int consumersPerQueue = 1;
    private volatile ScheduledFuture<?> consumerMonitorTask;
    private volatile long lastAlertAt;
    private volatile long lastRestartAttempt;

    public DirectMessageListenerContainer() {
        this.setMissingQueuesFatal(false);
    }

    public DirectMessageListenerContainer(ConnectionFactory connectionFactory) {
        this.setConnectionFactory(connectionFactory);
        this.setMissingQueuesFatal(false);
    }

    public void setConsumersPerQueue(int consumersPerQueue) {
        if (this.isRunning()) {
            this.adjustConsumers(consumersPerQueue);
        }
        this.consumersPerQueue = consumersPerQueue;
    }

    @Override
    public final void setExclusive(boolean exclusive) {
        Assert.isTrue((!exclusive || this.consumersPerQueue == 1 ? 1 : 0) != 0, (String)"When the consumer is exclusive, the consumers per queue must be 1");
        super.setExclusive(exclusive);
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
        this.taskSchedulerSet = true;
    }

    public void setMonitorInterval(long monitorInterval) {
        this.monitorInterval = monitorInterval;
    }

    @Override
    public void setQueueNames(String ... queueName) {
        Assert.state((!this.isRunning() ? 1 : 0) != 0, (String)"Cannot set queue names while running, use add/remove");
        super.setQueueNames(queueName);
    }

    @Override
    public final void setMissingQueuesFatal(boolean missingQueuesFatal) {
        super.setMissingQueuesFatal(missingQueuesFatal);
    }

    public void setMessagesPerAck(int messagesPerAck) {
        this.messagesPerAck = messagesPerAck;
    }

    public void setAckTimeout(long ackTimeout) {
        this.ackTimeout = ackTimeout;
    }

    @Override
    public void addQueueNames(String ... queueNames) {
        try {
            this.addQueues(Arrays.stream(queueNames));
        }
        catch (AmqpIOException e) {
            throw new AmqpIOException("Failed to add " + Arrays.asList(queueNames), e.getCause());
        }
        super.addQueueNames(queueNames);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addQueues(Stream<String> queueNameStream) {
        if (this.isRunning()) {
            Object object = this.consumersMonitor;
            synchronized (object) {
                this.checkStartState();
                Set<String> current = this.getQueueNamesAsSet();
                queueNameStream.forEach(queue -> {
                    if (current.contains(queue)) {
                        this.logger.warn((Object)("Queue " + queue + " is already configured for this container: " + this + ", ignoring add"));
                    } else {
                        this.consumeFromQueue((String)queue);
                    }
                });
            }
        }
    }

    @Override
    public boolean removeQueueNames(String ... queueNames) {
        this.removeQueues(Arrays.stream(queueNames));
        return super.removeQueueNames(queueNames);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeQueues(Stream<String> queueNames) {
        if (this.isRunning()) {
            Object object = this.consumersMonitor;
            synchronized (object) {
                this.checkStartState();
                queueNames.map(arg_0 -> this.consumersByQueue.remove(arg_0)).filter(Objects::nonNull).flatMap(Collection::stream).forEach(this::cancelConsumer);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void adjustConsumers(int newCount) {
        Object object = this.consumersMonitor;
        synchronized (object) {
            this.checkStartState();
            this.consumersToRestart.clear();
            for (String queue : this.getQueueNames()) {
                while (this.consumersByQueue.get((Object)queue) == null || ((List)this.consumersByQueue.get((Object)queue)).size() < newCount) {
                    this.doConsumeFromQueue(queue);
                }
                List consumerList = (List)this.consumersByQueue.get((Object)queue);
                if (consumerList == null || consumerList.size() <= newCount) continue;
                int currentCount = consumerList.size();
                for (int i = newCount; i < currentCount; ++i) {
                    SimpleConsumer consumer = (SimpleConsumer)((Object)consumerList.remove(i));
                    this.cancelConsumer(consumer);
                }
            }
        }
    }

    private void checkStartState() {
        if (!this.isRunning()) {
            try {
                Assert.state((boolean)this.startedLatch.await(60L, TimeUnit.SECONDS), (String)"Container is not started - cannot adjust queues");
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new AmqpException("Interrupted waiting for start", (Throwable)e);
            }
        }
    }

    @Override
    protected void doInitialize() throws Exception {
        if (this.taskScheduler == null) {
            ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            threadPoolTaskScheduler.setThreadNamePrefix(this.getBeanName() + "-consumerMonitor-");
            threadPoolTaskScheduler.afterPropertiesSet();
            this.taskScheduler = threadPoolTaskScheduler;
        }
        if (this.messagesPerAck > 0) {
            Assert.state((!this.isChannelTransacted() ? 1 : 0) != 0, (String)"'messagesPerAck' is not allowed with transactions");
        }
    }

    @Override
    protected void doStart() throws Exception {
        if (!this.started) {
            this.actualStart();
        }
    }

    @Override
    protected void doStop() {
        super.doStop();
        if (!this.taskSchedulerSet && this.taskScheduler != null) {
            ((ThreadPoolTaskScheduler)this.taskScheduler).shutdown();
            this.taskScheduler = null;
        }
    }

    protected void actualStart() throws Exception {
        this.aborted = false;
        this.hasStopped = false;
        if (this.getPrefetchCount() < this.messagesPerAck) {
            this.setPrefetchCount(this.messagesPerAck);
        }
        super.doStart();
        String[] queueNames = this.getQueueNames();
        this.checkMissingQueues(queueNames);
        if (this.getTaskExecutor() == null) {
            this.afterPropertiesSet();
        }
        long idleEventInterval = this.getIdleEventInterval();
        if (this.taskScheduler == null) {
            this.afterPropertiesSet();
        }
        if (idleEventInterval > 0L && this.monitorInterval > idleEventInterval) {
            this.monitorInterval = idleEventInterval / 2L;
        }
        if (this.getFailedDeclarationRetryInterval() < this.monitorInterval) {
            this.monitorInterval = this.getFailedDeclarationRetryInterval();
        }
        this.lastRestartAttempt = System.currentTimeMillis();
        this.consumerMonitorTask = this.taskScheduler.scheduleAtFixedRate(() -> {
            List<SimpleConsumer> consumersToCancel;
            long now = System.currentTimeMillis();
            if (idleEventInterval > 0L && now - this.getLastReceive() > idleEventInterval && now - this.lastAlertAt > idleEventInterval) {
                this.publishIdleContainerEvent(now - this.getLastReceive());
                this.lastAlertAt = now;
            }
            Object object = this.consumersMonitor;
            synchronized (object) {
                consumersToCancel = this.consumers.stream().filter(c -> {
                    boolean open = c.getChannel().isOpen();
                    if (open && this.messagesPerAck > 1) {
                        try {
                            ((SimpleConsumer)c).ackIfNecessary(now);
                        }
                        catch (IOException e) {
                            this.logger.error((Object)"Exception while sending delayed ack", (Throwable)e);
                        }
                    }
                    return !open;
                }).collect(Collectors.toList());
            }
            consumersToCancel.forEach(c -> {
                block2: {
                    try {
                        RabbitUtils.closeMessageConsumer(c.getChannel(), Collections.singletonList(c.getConsumerTag()), this.isChannelTransacted());
                    }
                    catch (Exception e) {
                        if (!this.logger.isDebugEnabled()) break block2;
                        this.logger.debug((Object)("Error closing consumer " + (Object)c), (Throwable)e);
                    }
                }
                this.logger.error((Object)("Consumer canceled - channel closed " + (Object)c));
                c.cancelConsumer("Consumer " + (Object)c + " channel closed");
            });
            if (this.lastRestartAttempt + this.getFailedDeclarationRetryInterval() < now) {
                object = this.consumersMonitor;
                synchronized (object) {
                    ArrayList<SimpleConsumer> restartableConsumers = new ArrayList<SimpleConsumer>(this.consumersToRestart);
                    this.consumersToRestart.clear();
                    if (this.started) {
                        if (restartableConsumers.size() > 0) {
                            this.doRedeclareElementsIfNecessary();
                        }
                        for (SimpleConsumer consumer : restartableConsumers) {
                            if (this.logger.isDebugEnabled() && restartableConsumers.size() > 0) {
                                this.logger.debug((Object)("Attempting to restart consumer " + (Object)((Object)consumer)));
                            }
                            try {
                                this.doConsumeFromQueue(consumer.getQueue());
                            }
                            catch (AmqpConnectException | AmqpIOException e) {
                                this.logger.error((Object)"Cannot connect to server", e);
                                if (!(e.getCause() instanceof AmqpApplicationContextClosedException)) break;
                                this.logger.error((Object)"Application context is closed, terminating");
                                this.taskScheduler.schedule(this::stop, new Date());
                                break;
                            }
                        }
                        this.lastRestartAttempt = now;
                    }
                }
            }
            this.processMonitorTask();
        }, this.monitorInterval);
        if (queueNames.length > 0) {
            this.doRedeclareElementsIfNecessary();
            this.getTaskExecutor().execute(() -> {
                Object object = this.consumersMonitor;
                synchronized (object) {
                    if (this.hasStopped) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)"Consumer start aborted - container stopping");
                        }
                    } else {
                        BackOffExecution backOffExecution = this.getRecoveryBackOff().start();
                        while (!this.started && this.isRunning()) {
                            this.cancellationLock.reset();
                            try {
                                for (String queue : queueNames) {
                                    this.consumeFromQueue(queue);
                                }
                            }
                            catch (AmqpConnectException | AmqpIOException e) {
                                long nextBackOff = backOffExecution.nextBackOff();
                                if (nextBackOff < 0L || e.getCause() instanceof AmqpApplicationContextClosedException) {
                                    this.aborted = true;
                                    this.shutdown();
                                    this.logger.error((Object)"Failed to start container - fatal error or backOffs exhausted", e);
                                    this.taskScheduler.schedule(this::stop, new Date());
                                    break;
                                }
                                this.logger.error((Object)("Error creating consumer; retrying in " + nextBackOff), e);
                                this.doShutdown();
                                try {
                                    Thread.sleep(nextBackOff);
                                }
                                catch (InterruptedException e1) {
                                    Thread.currentThread().interrupt();
                                }
                                continue;
                            }
                            this.started = true;
                            this.startedLatch.countDown();
                        }
                    }
                }
            });
        } else {
            this.started = true;
            this.startedLatch.countDown();
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("Container initialized for queues: " + Arrays.asList(queueNames)));
        }
    }

    protected void doRedeclareElementsIfNecessary() {
        String routingLookupKey = this.getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.bind(this.getRoutingConnectionFactory(), routingLookupKey);
        }
        try {
            this.redeclareElementsIfNecessary();
        }
        catch (Exception e) {
            this.logger.error((Object)"Failed to redeclare elements", (Throwable)e);
        }
        finally {
            if (routingLookupKey != null) {
                SimpleResourceHolder.unbind(this.getRoutingConnectionFactory());
            }
        }
    }

    protected void processMonitorTask() {
    }

    private void checkMissingQueues(String[] queueNames) {
        if (this.isMissingQueuesFatal()) {
            RabbitAdmin checkAdmin = this.getRabbitAdmin();
            if (checkAdmin == null) {
                checkAdmin = new RabbitAdmin(this.getConnectionFactory());
            }
            for (String queue : queueNames) {
                Properties queueProperties = checkAdmin.getQueueProperties(queue);
                if (queueProperties != null || !this.isMissingQueuesFatal()) continue;
                throw new IllegalStateException("At least one of the configured queues is missing");
            }
        }
    }

    private void consumeFromQueue(String queue) {
        List list = (List)this.consumersByQueue.get((Object)queue);
        if (CollectionUtils.isEmpty((Collection)list)) {
            for (int i = 0; i < this.consumersPerQueue; ++i) {
                this.doConsumeFromQueue(queue);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doConsumeFromQueue(String queue) {
        if (!this.isActive()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Consume from queue " + queue + " ignore, container stopping"));
            }
            return;
        }
        String routingLookupKey = this.getRoutingLookupKey();
        if (routingLookupKey != null) {
            SimpleResourceHolder.bind(this.getRoutingConnectionFactory(), routingLookupKey);
        }
        Connection connection = null;
        try {
            connection = this.getConnectionFactory().createConnection();
        }
        catch (Exception e) {
            this.consumersToRestart.add(new SimpleConsumer(null, null, queue));
            throw new AmqpConnectException(e);
        }
        finally {
            if (routingLookupKey != null) {
                SimpleResourceHolder.unbind(this.getRoutingConnectionFactory());
            }
        }
        Channel channel = null;
        SimpleConsumer consumer = null;
        try {
            channel = connection.createChannel(this.isChannelTransacted());
            channel.basicQos(this.getPrefetchCount());
            consumer = new SimpleConsumer(connection, channel, queue);
            channel.queueDeclarePassive(queue);
            consumer.consumerTag = channel.basicConsume(queue, this.getAcknowledgeMode().isAutoAck(), this.getConsumerTagStrategy() != null ? this.getConsumerTagStrategy().createConsumerTag(queue) : "", false, this.isExclusive(), this.getConsumerArguments(), (Consumer)consumer);
        }
        catch (AmqpApplicationContextClosedException e) {
            throw new AmqpConnectException((Exception)((Object)e));
        }
        catch (IOException e) {
            RabbitUtils.closeChannel(channel);
            RabbitUtils.closeConnection(connection);
            if (e.getCause() instanceof ShutdownSignalException && e.getCause().getMessage().contains("in exclusive use")) {
                this.getExclusiveConsumerExceptionLogger().log(this.logger, "Exclusive consumer failure", e.getCause());
                this.publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Queue not present or basicConsume failed, scheduling consumer " + (Object)((Object)consumer) + " for restart"));
            }
            this.consumersToRestart.add(consumer);
            consumer = null;
        }
        Object object = this.consumersMonitor;
        synchronized (object) {
            if (consumer != null) {
                this.cancellationLock.add(consumer);
                this.consumers.add(consumer);
                this.consumersByQueue.add((Object)queue, (Object)consumer);
                if (this.logger.isInfoEnabled()) {
                    this.logger.info((Object)((Object)((Object)consumer) + " started"));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doShutdown() {
        boolean waitForConsumers = false;
        Object object = this.consumersMonitor;
        synchronized (object) {
            if (this.started || this.aborted) {
                this.actualShutDown();
                waitForConsumers = true;
            }
        }
        if (waitForConsumers) {
            try {
                if (this.cancellationLock.await(this.getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
                    this.logger.info((Object)"Successfully waited for consumers to finish.");
                } else {
                    this.logger.info((Object)"Consumers not finished.");
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.warn((Object)"Interrupted waiting for consumers. Continuing with shutdown.");
            }
            finally {
                this.startedLatch = new CountDownLatch(1);
                this.started = false;
                this.aborted = false;
                this.hasStopped = true;
            }
        }
    }

    private void actualShutDown() {
        Assert.state((this.getTaskExecutor() != null ? 1 : 0) != 0, (String)"Cannot shut down if not initialized");
        this.logger.debug((Object)"Shutting down");
        new LinkedList<SimpleConsumer>(this.consumers).forEach(this::cancelConsumer);
        this.consumers.clear();
        this.consumersByQueue.clear();
        this.logger.debug((Object)"All consumers canceled");
        if (this.consumerMonitorTask != null) {
            this.consumerMonitorTask.cancel(true);
            this.consumerMonitorTask = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelConsumer(SimpleConsumer consumer) {
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Canceling " + (Object)((Object)consumer)));
            }
            SimpleConsumer simpleConsumer = consumer;
            synchronized (simpleConsumer) {
                consumer.canceled = true;
                if (this.messagesPerAck > 1) {
                    consumer.ackIfNecessary(0L);
                }
            }
            consumer.getChannel().basicCancel(consumer.getConsumerTag());
        }
        catch (IOException e) {
            this.logger.error((Object)("Failed to cancel consumer: " + (Object)((Object)consumer)), (Throwable)e);
        }
        finally {
            this.consumers.remove((Object)consumer);
            this.consumerRemoved(consumer);
        }
    }

    protected void consumerRemoved(SimpleConsumer consumer) {
    }

    final class SimpleConsumer
    extends DefaultConsumer {
        private final Log logger;
        private final Connection connection;
        private final String queue;
        private final boolean ackRequired;
        private final ConnectionFactory connectionFactory;
        private final PlatformTransactionManager transactionManager;
        private final TransactionAttribute transactionAttribute;
        private final boolean isRabbitTxManager;
        private final int messagesPerAck;
        private final long ackTimeout;
        private int pendingAcks;
        private long lastAck;
        private long lastDeliveryComplete;
        private long deliveryTag;
        private volatile String consumerTag;
        private volatile int epoch;
        private volatile TransactionTemplate transactionTemplate;
        private volatile boolean canceled;

        private SimpleConsumer(Connection connection, Channel channel, String queue) {
            super(channel);
            this.logger = DirectMessageListenerContainer.this.logger;
            this.connectionFactory = DirectMessageListenerContainer.this.getConnectionFactory();
            this.transactionManager = DirectMessageListenerContainer.this.getTransactionManager();
            this.transactionAttribute = DirectMessageListenerContainer.this.getTransactionAttribute();
            this.isRabbitTxManager = this.transactionManager instanceof RabbitTransactionManager;
            this.messagesPerAck = DirectMessageListenerContainer.this.messagesPerAck;
            this.ackTimeout = DirectMessageListenerContainer.this.ackTimeout;
            this.lastDeliveryComplete = this.lastAck = System.currentTimeMillis();
            this.connection = connection;
            this.queue = queue;
            this.ackRequired = !DirectMessageListenerContainer.this.getAcknowledgeMode().isAutoAck() && !DirectMessageListenerContainer.this.getAcknowledgeMode().isManual();
        }

        private String getQueue() {
            return this.queue;
        }

        public String getConsumerTag() {
            return this.consumerTag;
        }

        int getEpoch() {
            return this.epoch;
        }

        int incrementAndGetEpoch() {
            return ++this.epoch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            MessageProperties messageProperties = DirectMessageListenerContainer.this.getMessagePropertiesConverter().toMessageProperties(properties, envelope, "UTF-8");
            messageProperties.setConsumerTag(consumerTag);
            messageProperties.setConsumerQueue(this.queue);
            Message message = new Message(body, messageProperties);
            long deliveryTag = envelope.getDeliveryTag();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)((Object)((Object)this) + " received " + message));
            }
            DirectMessageListenerContainer.this.updateLastReceive();
            if (this.transactionManager != null) {
                try {
                    if (this.isRabbitTxManager) {
                        ConsumerChannelRegistry.registerConsumerChannel(this.getChannel(), this.connectionFactory);
                    }
                    if (this.transactionTemplate == null) {
                        this.transactionTemplate = new TransactionTemplate(this.transactionManager, (TransactionDefinition)this.transactionAttribute);
                    }
                    this.transactionTemplate.execute(s -> {
                        RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(this.getChannel(), false), this.connectionFactory, true);
                        if (resourceHolder != null) {
                            resourceHolder.addDeliveryTag(this.getChannel(), deliveryTag);
                        }
                        try {
                            this.callExecuteListener(message, deliveryTag);
                        }
                        catch (RuntimeException e1) {
                            DirectMessageListenerContainer.this.prepareHolderForRollback(resourceHolder, e1);
                            throw e1;
                        }
                        catch (Throwable e2) {
                            throw new AbstractMessageListenerContainer.WrappedTransactionException(e2);
                        }
                        return null;
                    });
                    return;
                }
                catch (Throwable e) {
                    if (!(e instanceof AbstractMessageListenerContainer.WrappedTransactionException) || !(e.getCause() instanceof Error)) return;
                    throw (Error)e.getCause();
                }
                finally {
                    if (this.isRabbitTxManager) {
                        ConsumerChannelRegistry.unRegisterConsumerChannel();
                    }
                }
            }
            try {
                this.callExecuteListener(message, deliveryTag);
                return;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        private void callExecuteListener(Message message, long deliveryTag) throws Exception {
            boolean channelLocallyTransacted = DirectMessageListenerContainer.this.isChannelLocallyTransacted();
            try {
                DirectMessageListenerContainer.this.executeListener(this.getChannel(), message);
                this.handleAck(deliveryTag, channelLocallyTransacted);
            }
            catch (ImmediateAcknowledgeAmqpException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("User requested ack for failed delivery: " + deliveryTag));
                }
                this.handleAck(deliveryTag, channelLocallyTransacted);
            }
            catch (Exception e) {
                if (DirectMessageListenerContainer.this.causeChainHasImmediateAcknowledgeAmqpException(e)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("User requested ack for failed delivery: " + deliveryTag));
                    }
                    this.handleAck(deliveryTag, channelLocallyTransacted);
                }
                this.logger.error((Object)"Failed to invoke listener", (Throwable)e);
                if (this.transactionManager != null) {
                    if (this.transactionAttribute.rollbackOn((Throwable)e)) {
                        RabbitResourceHolder resourceHolder = (RabbitResourceHolder)((Object)TransactionSynchronizationManager.getResource((Object)DirectMessageListenerContainer.this.getConnectionFactory()));
                        if (resourceHolder == null) {
                            this.rollback(deliveryTag, e);
                        }
                        throw e;
                    }
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("No rollback for " + e));
                    }
                }
                this.rollback(deliveryTag, e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleAck(long deliveryTag, boolean channelLocallyTransacted) throws IOException {
            boolean isLocallyTransacted = channelLocallyTransacted || DirectMessageListenerContainer.this.isChannelTransacted() && TransactionSynchronizationManager.getResource((Object)this.connectionFactory) == null;
            try {
                if (this.ackRequired) {
                    if (this.messagesPerAck > 1) {
                        SimpleConsumer simpleConsumer = this;
                        synchronized (simpleConsumer) {
                            this.deliveryTag = deliveryTag;
                            ++this.pendingAcks;
                            this.lastDeliveryComplete = System.currentTimeMillis();
                            this.ackIfNecessary(this.lastDeliveryComplete);
                        }
                    } else if (!DirectMessageListenerContainer.this.isChannelTransacted() || isLocallyTransacted) {
                        this.getChannel().basicAck(deliveryTag, false);
                    }
                }
                if (isLocallyTransacted) {
                    RabbitUtils.commitIfNecessary(this.getChannel());
                }
            }
            catch (Exception e) {
                this.logger.error((Object)"Error acking", (Throwable)e);
            }
        }

        private synchronized void ackIfNecessary(long now) throws IOException {
            if (this.pendingAcks >= this.messagesPerAck || this.pendingAcks > 0 && (now - this.lastAck > this.ackTimeout || this.canceled)) {
                this.sendAck(now);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void rollback(long deliveryTag, Exception e) {
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.rollbackIfNecessary(this.getChannel());
            }
            if (this.ackRequired) {
                try {
                    if (this.messagesPerAck > 1) {
                        SimpleConsumer simpleConsumer = this;
                        synchronized (simpleConsumer) {
                            if (this.pendingAcks > 0) {
                                this.sendAck(System.currentTimeMillis());
                            }
                        }
                    }
                    this.getChannel().basicNack(deliveryTag, true, RabbitUtils.shouldRequeue(DirectMessageListenerContainer.this.isDefaultRequeueRejected(), e, this.logger));
                }
                catch (IOException e1) {
                    this.logger.error((Object)"Failed to nack message", (Throwable)e1);
                }
            }
            if (DirectMessageListenerContainer.this.isChannelTransacted()) {
                RabbitUtils.commitIfNecessary(this.getChannel());
            }
        }

        protected synchronized void sendAck(long now) throws IOException {
            this.getChannel().basicAck(this.deliveryTag, true);
            this.lastAck = now;
            this.pendingAcks = 0;
        }

        public void handleConsumeOk(String consumerTag) {
            super.handleConsumeOk(consumerTag);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("New " + (Object)((Object)this) + " consumeOk"));
            }
        }

        public void handleCancelOk(String consumerTag) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("CancelOk " + (Object)((Object)this)));
            }
            this.finalizeConsumer();
        }

        public void handleCancel(String consumerTag) throws IOException {
            this.logger.error((Object)("Consumer canceled - queue deleted? " + (Object)((Object)this)));
            this.cancelConsumer("Consumer " + (Object)((Object)this) + " canceled");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void cancelConsumer(String eventMessage) {
            DirectMessageListenerContainer.this.publishConsumerFailedEvent(eventMessage, true, null);
            Object object = DirectMessageListenerContainer.this.consumersMonitor;
            synchronized (object) {
                List list = (List)DirectMessageListenerContainer.this.consumersByQueue.get((Object)this.queue);
                if (list != null) {
                    list.remove((Object)this);
                }
                DirectMessageListenerContainer.this.consumers.remove((Object)this);
                DirectMessageListenerContainer.this.consumersToRestart.add(this);
            }
            this.finalizeConsumer();
        }

        private void finalizeConsumer() {
            RabbitUtils.setPhysicalCloseRequired(true);
            RabbitUtils.closeChannel(this.getChannel());
            RabbitUtils.closeConnection(this.connection);
            DirectMessageListenerContainer.this.cancellationLock.release(this);
            DirectMessageListenerContainer.this.consumerRemoved(this);
        }

        public String toString() {
            return "SimpleConsumer [queue=" + this.queue + ", consumerTag=" + this.consumerTag + " identity=" + ObjectUtils.getIdentityHexString((Object)((Object)this)) + "]";
        }
    }
}

