package com.threerings.messaging.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.samskivert.util.Logger;
import com.threerings.coin.server.persist.CoinTransaction;
import com.threerings.messaging.ConnectedListener;
import com.threerings.messaging.DestinationAddress;
import com.threerings.messaging.InMessage;
import com.threerings.messaging.MessageListener;
import com.threerings.messaging.OutMessage;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/* loaded from: input_file:com/threerings/messaging/amqp/AMQPConnectedListener.class */
public class AMQPConnectedListener implements ConnectedListener {
    private static final Logger logger = Logger.getLogger(AMQPMessageConnection.class);
    protected final String _queueName;
    protected final DestinationAddress _addr;
    protected final MessageListener _listener;
    protected final ChannelFactory _channelFactory;
    protected ExecutorService _service;
    protected Channel _channel;
    protected String _consumerTag;
    protected boolean _shutdown = false;

    /* loaded from: input_file:com/threerings/messaging/amqp/AMQPConnectedListener$AMQPInMessage.class */
    protected class AMQPInMessage implements InMessage {
        protected final AMQP.BasicProperties _props;
        protected final long _deliveryTag;
        protected final byte[] _body;

        public AMQPInMessage(byte[] bArr, AMQP.BasicProperties basicProperties, long j) {
            this._body = bArr;
            this._props = basicProperties;
            this._deliveryTag = j;
        }

        @Override // com.threerings.messaging.InMessage
        public void ack() throws IOException {
            synchronized (AMQPConnectedListener.this) {
                AMQPConnectedListener.this._channel.basicAck(this._deliveryTag, false);
            }
        }

        @Override // com.threerings.messaging.InMessage
        public byte[] getBody() {
            return this._body;
        }

        @Override // com.threerings.messaging.InMessage
        public void reply(OutMessage outMessage) throws IOException {
            synchronized (AMQPConnectedListener.this) {
                AMQPConnectedListener.this._channel.basicPublish(AMQPConnectedListener.this._addr.exchange, this._props.replyTo, this._props, outMessage.encodeMessage());
            }
        }
    }

    public AMQPConnectedListener(String str, DestinationAddress destinationAddress, MessageListener messageListener, ChannelFactory channelFactory) {
        this._queueName = str;
        this._addr = destinationAddress;
        this._listener = messageListener;
        this._channelFactory = channelFactory;
        try {
            connect();
        } catch (IOException e) {
            logger.warning("Could not listen on queue.", new Object[]{"queueName", str, "address", destinationAddress.toString(), e});
        }
    }

    @Override // com.threerings.messaging.ConnectedListener, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        this._shutdown = true;
        if (this._service != null) {
            this._service.shutdown();
        }
        if (this._channel == null || isClosed() || !this._channel.isOpen()) {
            return;
        }
        try {
            this._channel.basicCancel(this._consumerTag);
            this._channel.close(CoinTransaction.ELEVATOR, "Consumer closed.");
        } catch (Throwable th) {
            this._channel.close(CoinTransaction.ELEVATOR, "Consumer closed.");
            throw th;
        }
    }

    @Override // com.threerings.messaging.ConnectedListener
    public synchronized boolean isClosed() {
        return this._shutdown || this._consumerTag == null;
    }

    public synchronized void connect() throws IOException {
        close();
        this._shutdown = false;
        this._channel = this._channelFactory.createChannel();
        this._channel.exchangeDeclare(this._addr.exchange, "direct", true);
        this._channel.queueDeclare(this._queueName, true);
        this._channel.queueBind(this._queueName, this._addr.exchange, this._addr.getRoutingKey());
        final QueueingConsumer queueingConsumer = new QueueingConsumer(this._channel) { // from class: com.threerings.messaging.amqp.AMQPConnectedListener.1
            public void handleCancelOk(String str) {
                super.handleCancelOk(str);
                AMQPConnectedListener.logger.info("Canceled consumer for queue: " + AMQPConnectedListener.this._queueName, new Object[0]);
            }

            public void handleConsumeOk(String str) {
                super.handleConsumeOk(str);
                AMQPConnectedListener.this._consumerTag = str;
                AMQPConnectedListener.logger.info("Consume OK", new Object[]{"queue", AMQPConnectedListener.this._queueName, "consumerTag", str});
            }

            public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
                super.handleShutdownSignal(str, shutdownSignalException);
                AMQPConnectedListener.logger.info("Disconnected from queue: " + AMQPConnectedListener.this._queueName, new Object[0]);
            }
        };
        this._consumerTag = this._channel.basicConsume(this._queueName, false, queueingConsumer);
        this._service = Executors.newSingleThreadExecutor();
        this._service.execute(new Runnable() { // from class: com.threerings.messaging.amqp.AMQPConnectedListener.2
            @Override // java.lang.Runnable
            public void run() {
                while (!AMQPConnectedListener.this._shutdown) {
                    QueueingConsumer.Delivery delivery = null;
                    try {
                        delivery = queueingConsumer.nextDelivery();
                        AMQPConnectedListener.logger.info("Message received from RabbitMQ", new Object[]{"queue", AMQPConnectedListener.this._queueName});
                        AMQPConnectedListener.this._listener.received(new AMQPInMessage(delivery.getBody(), delivery.getProperties(), delivery.getEnvelope().getDeliveryTag()));
                    } catch (ShutdownSignalException e) {
                    } catch (InterruptedException e2) {
                        if (!AMQPConnectedListener.this._shutdown) {
                            AMQPConnectedListener.logger.warning("Interrupted while a waiting for messages from RabbitMQ message.", new Object[]{"queueName", AMQPConnectedListener.this._queueName, e2});
                        }
                    } catch (Throwable th) {
                        AMQPConnectedListener.logger.warning("Something nasty happened while processing a RabbitMQ message.", new Object[]{"queueName", AMQPConnectedListener.this._queueName, "delivery", delivery, th});
                    }
                }
            }
        });
    }
}
