/*
 * Decompiled with CFR 0.152.
 */
package com.threerings.messaging.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.RpcClient;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.BlockingCell;
import com.threerings.messaging.DestinationAddress;
import com.threerings.messaging.OutMessage;
import com.threerings.messaging.ReplyingDestination;
import com.threerings.messaging.amqp.ChannelFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AMQPReplyingDestination
implements ReplyingDestination {
    protected int _correlationId;
    protected RpcClient _client;
    protected Channel _channel;
    protected final ChannelFactory _channelFactory;
    protected final DestinationAddress _destAddress;

    public AMQPReplyingDestination(ChannelFactory channelFactory, DestinationAddress addr) throws IOException {
        this._channelFactory = channelFactory;
        this._destAddress = addr;
        this.createClient();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public byte[] sendMessage(OutMessage msg, long timeout) throws IOException, TimeoutException {
        BlockingCell k;
        int retries = 1;
        while (true) {
            try {
                AMQP.BasicProperties props;
                this._client.checkConsumer();
                k = new BlockingCell();
                Object object = this._client.getContinuationMap();
                synchronized (object) {
                    String replyId = Integer.toString(this._correlationId++);
                    props = new AMQP.BasicProperties(null, null, null, null, null, replyId, this._client.getReplyQueue(), null, null, null, null, null, null, null);
                    this._client.getContinuationMap().put(replyId, k);
                }
                object = this;
                synchronized (object) {
                    this._client.publish(props, msg.encodeMessage());
                }
            }
            catch (ShutdownSignalException sse) {
                if (retries == 0) {
                    throw sse;
                }
                this.createClient();
                --retries;
                continue;
            }
            catch (IOException ioe) {
                if (retries == 0) {
                    throw ioe;
                }
                this.createClient();
                --retries;
                continue;
            }
            break;
        }
        Object reply = k.uninterruptibleGet((int)timeout);
        if (reply instanceof ShutdownSignalException) {
            ShutdownSignalException sig = (ShutdownSignalException)((Object)reply);
            IOException ioe = new IOException("Channel shutdown.");
            ioe.initCause(sig);
            throw ioe;
        }
        return (byte[])reply;
    }

    public synchronized void close() throws IOException {
        this._client.close();
        if (this._channel.isOpen()) {
            this._channel.close(200, "Replying destination closed.");
        }
        this._client = null;
    }

    public synchronized boolean isClosed() {
        return this._client == null;
    }

    private synchronized void createClient() throws IOException {
        this._channel = this._channelFactory.createChannel();
        this._channel.exchangeDeclare(this._destAddress.exchange, "direct", true);
        this._client = new RpcClient(this._channel, this._destAddress.exchange, this._destAddress.getRoutingKey());
        this._channel.queueBind(this._client.getReplyQueue(), this._destAddress.exchange, this._client.getReplyQueue());
    }
}

