/*
 * Decompiled with CFR 0.152.
 */
package com.threerings.messaging.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.samskivert.util.Logger;
import com.threerings.messaging.AddressedMessageListener;
import com.threerings.messaging.MessageConnection;
import com.threerings.messaging.MessageSender;
import com.threerings.messaging.amqp.AMQPConnectedListener;
import com.threerings.messaging.amqp.AMQPMessageConfig;
import com.threerings.messaging.amqp.AMQPMessageSender;
import com.threerings.messaging.amqp.ChannelFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AMQPMessageConnection
implements MessageConnection {
    protected static final Logger logger = Logger.getLogger(AMQPMessageConnection.class);
    protected static final int CLOSE_TIMEOUT = 2000;
    protected volatile Connection _conn;
    protected final AMQPMessageConfig _config;
    protected final AMQPMessageSender _sender;
    protected final ChannelFactory _channelFactory;
    protected final ScheduledExecutorService _reconnectService;
    protected final Map<AddressedMessageListener, AMQPConnectedListener> _listeners = new ConcurrentHashMap<AddressedMessageListener, AMQPConnectedListener>();

    public AMQPMessageConnection(AMQPMessageConfig config) {
        this._config = config;
        this._channelFactory = new ChannelFactory(){

            public Channel createChannel() throws IOException {
                if (AMQPMessageConnection.this._conn != null) {
                    try {
                        return AMQPMessageConnection.this._conn.createChannel();
                    }
                    catch (ShutdownSignalException shutdownSignalException) {
                        // empty catch block
                    }
                }
                AMQPMessageConnection.this.connect();
                return AMQPMessageConnection.this._conn.createChannel();
            }
        };
        this._sender = new AMQPMessageSender(this._channelFactory);
        this._reconnectService = Executors.newSingleThreadScheduledExecutor();
        try {
            this.connect();
        }
        catch (IOException ioe) {
            logger.warning((Object)"Cannot connect to RabbitMQ server.  Will retry every 5 seconds.", new Object[]{ioe});
            this._reconnectService.schedule(new AttemptReconnect(), 5L, TimeUnit.SECONDS);
        }
    }

    public void listen(AddressedMessageListener listener) {
        AMQPConnectedListener connectedListener;
        if (this._listeners.containsKey(listener) && !(connectedListener = this._listeners.get(listener)).isClosed()) {
            logger.warning((Object)"Reconnecting listener", new Object[]{"listener", listener});
            try {
                connectedListener.close();
            }
            catch (IOException ex) {
                logger.warning((Object)"Could not close old listener connection", new Object[]{"listener", listener, ex});
            }
        }
        logger.info((Object)"Connecting listener", new Object[]{"listener", listener});
        connectedListener = new AMQPConnectedListener(listener.queueName, listener.address, listener, this._channelFactory);
        this._listeners.put(listener, connectedListener);
    }

    public void removeListener(AddressedMessageListener listener) {
        if (this._listeners.containsKey(listener)) {
            logger.warning((Object)"Removing listener", new Object[]{"listener", listener});
            AMQPConnectedListener connectedListener = this._listeners.remove(listener);
            if (!connectedListener.isClosed()) {
                try {
                    connectedListener.close();
                }
                catch (IOException ex) {
                    logger.warning((Object)"Barfed whiled trying to close a connected RabbitMQ listener", new Object[]{"listener", listener, "connectedListener", connectedListener, ex});
                }
            }
        } else {
            logger.warning((Object)"Tried to remove a deaf RabbitMQ message listener", new Object[]{"listener", listener});
        }
    }

    public synchronized void close() throws IOException {
        logger.info((Object)"Closing connection to RabbitMQ server.", new Object[0]);
        this._reconnectService.shutdown();
        this._sender.close();
        for (AddressedMessageListener listener : this._listeners.keySet()) {
            this.removeListener(listener);
        }
        if (this._conn != null) {
            this._conn.close(2000);
        }
    }

    public MessageSender getSender() {
        return this._sender;
    }

    private synchronized void connect() throws IOException {
        if (this._conn != null && this._conn.isOpen()) {
            return;
        }
        ConnectionParameters params = new ConnectionParameters();
        params.setUsername(this._config.username);
        params.setPassword(this._config.password);
        params.setVirtualHost(this._config.virtualHost);
        params.setRequestedHeartbeat(this._config.heartBeat);
        ConnectionFactory factory = new ConnectionFactory(params);
        logger.debug((Object)("Establishing connection to RabbitMQ server: " + this._config), new Object[0]);
        this._conn = factory.newConnection(this._config.hostAddresses);
        this._conn.addShutdownListener(new ShutdownListener(){

            public void shutdownCompleted(ShutdownSignalException ex) {
                if (!AMQPMessageConnection.this._reconnectService.isShutdown()) {
                    logger.warning((Object)"RabbitMQ connection closed unexpectedly.  Retrying every 5 seconds.", new Object[0]);
                    AMQPMessageConnection.this._reconnectService.schedule(new AttemptReconnect(), 5L, TimeUnit.SECONDS);
                } else {
                    logger.info((Object)"RabbitMQ connection closed.", new Object[0]);
                }
            }
        });
        logger.info((Object)("Connection established to RabbitMQ server: " + this._config), new Object[0]);
        for (AddressedMessageListener listener : this._listeners.keySet()) {
            this._reconnectService.schedule(new ListenerReconnectAttempt(listener), 0L, TimeUnit.SECONDS);
        }
    }

    protected class ListenerReconnectAttempt
    implements Runnable {
        protected AddressedMessageListener _listener;

        public ListenerReconnectAttempt(AddressedMessageListener listener) {
            this._listener = listener;
        }

        public void run() {
            try {
                AMQPMessageConnection.this.listen(this._listener);
                if (AMQPMessageConnection.this._listeners.get(this._listener).isClosed()) {
                    logger.warning((Object)"Failed to connect listener. Will retry.", new Object[]{"listener", this._listener});
                    AMQPMessageConnection.this._reconnectService.schedule(this, 5L, TimeUnit.SECONDS);
                } else {
                    logger.info((Object)"Listener connected", new Object[]{"listener", this._listener});
                }
            }
            catch (ShutdownSignalException sse) {
            }
            catch (Throwable ex) {
                logger.warning((Object)"Something nasty happened while trying to reconnect listener", new Object[]{"listener", this._listener, ex});
                AMQPMessageConnection.this._reconnectService.schedule(this, 60L, TimeUnit.SECONDS);
            }
        }
    }

    protected class AttemptReconnect
    implements Runnable {
        protected AttemptReconnect() {
        }

        public void run() {
            try {
                AMQPMessageConnection.this.connect();
            }
            catch (Throwable t) {
                logger.debug((Object)"Could not reconnect to RabbitMQ server.", new Object[]{t});
                AMQPMessageConnection.this._reconnectService.schedule(new AttemptReconnect(), 5L, TimeUnit.SECONDS);
            }
        }
    }
}

