/*
 * Decompiled with CFR 0.152.
 */
package com.threerings.messaging.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownSignalException;
import com.samskivert.util.Logger;
import com.threerings.messaging.DestinationAddress;
import com.threerings.messaging.MessageSender;
import com.threerings.messaging.OutMessage;
import com.threerings.messaging.ReplyingDestination;
import com.threerings.messaging.amqp.AMQPReplyingDestination;
import com.threerings.messaging.amqp.ChannelFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

public class AMQPMessageSender
implements MessageSender {
    protected final ChannelFactory _channelFactory;
    protected final Set<String> _declaredExchanges;
    protected final Set<AMQPReplyingDestination> _destinations = new CopyOnWriteArraySet<AMQPReplyingDestination>();
    protected static final Logger logger = Logger.getLogger(AMQPMessageSender.class);

    public AMQPMessageSender(ChannelFactory channelFactory) {
        this._channelFactory = channelFactory;
        this._declaredExchanges = new HashSet<String>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendMessage(OutMessage msg, DestinationAddress addr) throws IOException {
        logger.info((Object)"Sending AMQP message", new Object[]{"msg", msg, "addr", addr});
        int retries = 1;
        while (true) {
            Channel channel = this._channelFactory.createChannel();
            try {
                AMQPMessageSender aMQPMessageSender = this;
                synchronized (aMQPMessageSender) {
                    if (!this._declaredExchanges.contains(addr.exchange)) {
                        logger.info((Object)"Declaring AMQP exchange", new Object[]{"exchange", addr.exchange});
                        channel.exchangeDeclare(addr.exchange, "direct", true);
                        this._declaredExchanges.add(addr.exchange);
                    }
                }
                channel.basicPublish(addr.exchange, addr.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, msg.encodeMessage());
                return;
            }
            catch (ShutdownSignalException sse) {
                if (retries == 0) {
                    throw sse;
                }
                --retries;
                continue;
            }
            finally {
                try {
                    channel.close(200, "Message sent.");
                }
                catch (ShutdownSignalException sse) {}
                continue;
            }
            break;
        }
    }

    public ReplyingDestination createReplyingDestination(DestinationAddress addr) throws IOException {
        AMQPReplyingDestination dest = new AMQPReplyingDestination(this._channelFactory, addr);
        this._destinations.add(dest);
        return dest;
    }

    public void close() throws IOException {
        for (AMQPReplyingDestination destination : this._destinations) {
            if (destination.isClosed()) continue;
            destination.close();
        }
    }
}

