package com.threerings.messaging.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownSignalException;
import com.samskivert.util.Logger;
import com.threerings.coin.server.persist.CoinTransaction;
import com.threerings.messaging.DestinationAddress;
import com.threerings.messaging.MessageSender;
import com.threerings.messaging.OutMessage;
import com.threerings.messaging.ReplyingDestination;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

/* loaded from: input_file:com/threerings/messaging/amqp/AMQPMessageSender.class */
public class AMQPMessageSender implements MessageSender {
    protected final ChannelFactory _channelFactory;
    protected static final Logger logger = Logger.getLogger(AMQPMessageSender.class);
    protected final Set<AMQPReplyingDestination> _destinations = new CopyOnWriteArraySet();
    protected final Set<String> _declaredExchanges = new HashSet();

    public AMQPMessageSender(ChannelFactory channelFactory) {
        this._channelFactory = channelFactory;
    }

    @Override // com.threerings.messaging.MessageSender
    public void sendMessage(OutMessage outMessage, DestinationAddress destinationAddress) throws IOException {
        logger.info("Sending AMQP message", new Object[]{"msg", outMessage, "addr", destinationAddress});
        int i = 1;
        while (true) {
            Channel createChannel = this._channelFactory.createChannel();
            try {
                synchronized (this) {
                    if (!this._declaredExchanges.contains(destinationAddress.exchange)) {
                        logger.info("Declaring AMQP exchange", new Object[]{"exchange", destinationAddress.exchange});
                        createChannel.exchangeDeclare(destinationAddress.exchange, "direct", true);
                        this._declaredExchanges.add(destinationAddress.exchange);
                    }
                }
                createChannel.basicPublish(destinationAddress.exchange, destinationAddress.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, outMessage.encodeMessage());
                try {
                    createChannel.close(CoinTransaction.ELEVATOR, "Message sent.");
                    return;
                } catch (ShutdownSignalException e) {
                    return;
                }
            } catch (ShutdownSignalException e2) {
                if (i == 0) {
                    throw e2;
                }
                try {
                    i--;
                    try {
                        createChannel.close(CoinTransaction.ELEVATOR, "Message sent.");
                    } catch (ShutdownSignalException e3) {
                    }
                } catch (Throwable th) {
                    try {
                        createChannel.close(CoinTransaction.ELEVATOR, "Message sent.");
                    } catch (ShutdownSignalException e4) {
                    }
                    throw th;
                }
            }
        }
    }

    @Override // com.threerings.messaging.MessageSender
    public ReplyingDestination createReplyingDestination(DestinationAddress destinationAddress) throws IOException {
        AMQPReplyingDestination aMQPReplyingDestination = new AMQPReplyingDestination(this._channelFactory, destinationAddress);
        this._destinations.add(aMQPReplyingDestination);
        return aMQPReplyingDestination;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        for (AMQPReplyingDestination aMQPReplyingDestination : this._destinations) {
            if (!aMQPReplyingDestination.isClosed()) {
                aMQPReplyingDestination.close();
            }
        }
    }
}
