/*
 * Decompiled with CFR 0.152.
 */
package com.threerings.messaging.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.samskivert.util.Logger;
import com.threerings.messaging.ConnectedListener;
import com.threerings.messaging.DestinationAddress;
import com.threerings.messaging.InMessage;
import com.threerings.messaging.MessageListener;
import com.threerings.messaging.OutMessage;
import com.threerings.messaging.amqp.AMQPMessageConnection;
import com.threerings.messaging.amqp.ChannelFactory;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AMQPConnectedListener
implements ConnectedListener {
    private static final Logger logger = Logger.getLogger(AMQPMessageConnection.class);
    protected final String _queueName;
    protected final DestinationAddress _addr;
    protected final MessageListener _listener;
    protected final ChannelFactory _channelFactory;
    protected ExecutorService _service;
    protected Channel _channel;
    protected String _consumerTag;
    protected boolean _shutdown;

    public AMQPConnectedListener(String queueName, DestinationAddress addr, MessageListener listener, ChannelFactory channelFactory) {
        this._queueName = queueName;
        this._addr = addr;
        this._listener = listener;
        this._channelFactory = channelFactory;
        this._shutdown = false;
        try {
            this.connect();
        }
        catch (IOException ioe) {
            logger.warning((Object)"Could not listen on queue.", new Object[]{"queueName", queueName, "address", addr.toString(), ioe});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() throws IOException {
        this._shutdown = true;
        if (this._service != null) {
            this._service.shutdown();
        }
        if (this._channel != null && !this.isClosed() && this._channel.isOpen()) {
            try {
                this._channel.basicCancel(this._consumerTag);
            }
            finally {
                this._channel.close(200, "Consumer closed.");
            }
        }
    }

    public synchronized boolean isClosed() {
        return this._shutdown || this._consumerTag == null;
    }

    public synchronized void connect() throws IOException {
        this.close();
        this._shutdown = false;
        this._channel = this._channelFactory.createChannel();
        this._channel.exchangeDeclare(this._addr.exchange, "direct", true);
        this._channel.queueDeclare(this._queueName, true);
        this._channel.queueBind(this._queueName, this._addr.exchange, this._addr.getRoutingKey());
        final QueueingConsumer consumer = new QueueingConsumer(this._channel){

            public void handleCancelOk(String consumerTag) {
                super.handleCancelOk(consumerTag);
                logger.info((Object)("Canceled consumer for queue: " + AMQPConnectedListener.this._queueName), new Object[0]);
            }

            public void handleConsumeOk(String consumerTag) {
                super.handleConsumeOk(consumerTag);
                AMQPConnectedListener.this._consumerTag = consumerTag;
                logger.info((Object)"Consume OK", new Object[]{"queue", AMQPConnectedListener.this._queueName, "consumerTag", consumerTag});
            }

            public void handleShutdownSignal(String consumerTag, ShutdownSignalException ex) {
                super.handleShutdownSignal(consumerTag, ex);
                logger.info((Object)("Disconnected from queue: " + AMQPConnectedListener.this._queueName), new Object[0]);
            }
        };
        this._consumerTag = this._channel.basicConsume(this._queueName, false, (Consumer)consumer);
        this._service = Executors.newSingleThreadExecutor();
        this._service.execute(new Runnable(){

            public void run() {
                while (!AMQPConnectedListener.this._shutdown) {
                    QueueingConsumer.Delivery delivery = null;
                    try {
                        delivery = consumer.nextDelivery();
                        logger.info((Object)"Message received from RabbitMQ", new Object[]{"queue", AMQPConnectedListener.this._queueName});
                        AMQPConnectedListener.this._listener.received(new AMQPInMessage(delivery.getBody(), delivery.getProperties(), delivery.getEnvelope().getDeliveryTag()));
                    }
                    catch (InterruptedException iex) {
                        if (AMQPConnectedListener.this._shutdown) continue;
                        logger.warning((Object)"Interrupted while a waiting for messages from RabbitMQ message.", new Object[]{"queueName", AMQPConnectedListener.this._queueName, iex});
                    }
                    catch (ShutdownSignalException sse) {
                    }
                    catch (Throwable ex) {
                        logger.warning((Object)"Something nasty happened while processing a RabbitMQ message.", new Object[]{"queueName", AMQPConnectedListener.this._queueName, "delivery", delivery, ex});
                    }
                }
            }
        });
    }

    protected class AMQPInMessage
    implements InMessage {
        protected final AMQP.BasicProperties _props;
        protected final long _deliveryTag;
        protected final byte[] _body;

        public AMQPInMessage(byte[] body, AMQP.BasicProperties props, long deliveryTag) {
            this._body = body;
            this._props = props;
            this._deliveryTag = deliveryTag;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void ack() throws IOException {
            AMQPConnectedListener aMQPConnectedListener = AMQPConnectedListener.this;
            synchronized (aMQPConnectedListener) {
                AMQPConnectedListener.this._channel.basicAck(this._deliveryTag, false);
            }
        }

        public byte[] getBody() {
            return this._body;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void reply(OutMessage message) throws IOException {
            AMQPConnectedListener aMQPConnectedListener = AMQPConnectedListener.this;
            synchronized (aMQPConnectedListener) {
                AMQPConnectedListener.this._channel.basicPublish(AMQPConnectedListener.this._addr.exchange, this._props.replyTo, this._props, message.encodeMessage());
            }
        }
    }
}

