/*
 * Decompiled with CFR 0.152.
 */
package com.threerings.nio.conman;

import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.samskivert.util.IntMap;
import com.samskivert.util.IntMaps;
import com.samskivert.util.Lifecycle;
import com.samskivert.util.LoopingThread;
import com.samskivert.util.Queue;
import com.samskivert.util.Tuple;
import com.threerings.NaryaLog;
import com.threerings.nio.SelectorIterable;
import com.threerings.nio.conman.ConMgrStats;
import com.threerings.nio.conman.Connection;
import com.threerings.nio.conman.NetEventHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Map;

public abstract class ConnectionManager
extends LoopingThread
implements Lifecycle.ShutdownComponent {
    protected PartialWriteHandler _oflowHandler = new PartialWriteHandler(){

        @Override
        public void handlePartialWrite(Connection conn, ByteBuffer msgbuf) {
            ConnectionManager.this._oflowqs.put(conn, new OverflowQueue(conn, msgbuf));
        }
    };
    protected Selector _selector;
    protected SelectorIterable _selectorSelector;
    protected Map<SelectionKey, NetEventHandler> _handlers = Maps.newHashMap();
    protected IntMap<Connection> _connections = IntMaps.newHashIntMap();
    protected Queue<Connection> _deathq = Queue.newQueue();
    protected Queue<SocketChannel> _acceptedq = Queue.newQueue();
    protected Queue<Tuple<Connection, byte[]>> _outq = Queue.newQueue();
    protected ByteBuffer _outbuf = ByteBuffer.allocateDirect(65536);
    protected Map<Connection, OverflowQueue> _oflowqs = Maps.newHashMap();
    protected ConMgrStats _stats = new ConMgrStats();
    protected long _lastDebugStamp;
    protected volatile Runnable _onExit;
    @Inject(optional=true)
    @Named(value="presents.net.selectLoopTime")
    protected int _selectLoopTime = 100;
    protected final long _idleTime;
    protected static final byte[] ASYNC_CLOSE_REQUEST = new byte[0];
    protected static final boolean DEBUG_REPORT = false;
    protected static final long DEBUG_REPORT_INTERVAL = 30000L;
    protected static final long LATENCY_GRACE = 30000L;

    public ConnectionManager(Lifecycle cycle, long idleTime) throws IOException {
        super("ConnectionManager");
        cycle.addComponent((Lifecycle.BaseComponent)this);
        this._selector = Selector.open();
        this._idleTime = idleTime;
    }

    public void setShutdownAction(Runnable onExit) {
        this._onExit = onExit;
    }

    public synchronized ConMgrStats getStats() {
        this._stats.connectionCount = this._connections.size();
        this._stats.handlerCount = this._handlers.size();
        this._stats.deathQueueSize = this._deathq.size();
        this._stats.outQueueSize = this._outq.size();
        if (this._oflowqs.size() > 0) {
            this._stats.overQueueSize = 0;
            for (OverflowQueue oq : this._oflowqs.values()) {
                this._stats.overQueueSize += oq.size();
            }
        }
        return this._stats.clone();
    }

    public SelectionKey register(SelectableChannel chan, int ops, NetEventHandler netEventHandler) throws IOException {
        SelectionKey key = chan.register(this._selector, ops);
        this._handlers.put(key, netEventHandler);
        return key;
    }

    public void transferAcceptedSocket(SocketChannel channel) {
        this._acceptedq.append((Object)channel);
    }

    public void closeConnection(Connection conn) {
        this._deathq.append((Object)conn);
    }

    protected void willStart() {
        super.willStart();
        this._selectorSelector = new SelectorIterable(this._selector, this._selectLoopTime, new SelectorIterable.SelectFailureHandler(){

            @Override
            public void handleSelectFailure(Exception e) {
                NaryaLog.log.error((Object)"One of our selectors crapped out completely.  Shutting down the connection manager.", new Object[]{e});
                ConnectionManager.this.shutdown();
            }
        });
    }

    protected void iterate() {
        Connection dconn;
        boolean generateDebugReport;
        long iterStamp = System.currentTimeMillis();
        boolean bl = generateDebugReport = iterStamp - this._lastDebugStamp > 30000L;
        while ((dconn = (Connection)this._deathq.getNonBlocking()) != null) {
            if (dconn.isClosed()) continue;
            dconn.close();
        }
        try {
            long idleStamp = iterStamp - this._idleTime;
            for (NetEventHandler handler : this._handlers.values()) {
                if (!handler.checkIdle(idleStamp)) continue;
                handler.becameIdle();
            }
        }
        catch (Exception e) {
            NaryaLog.log.warning((Object)("Failed to checkidle connection: " + e), new Object[0]);
        }
        this.sendOutgoingMessages(iterStamp);
        if (super.isRunning()) {
            this.handleIncoming(iterStamp);
        }
    }

    protected void handleIncoming(long iterStamp) {
        SocketChannel accepted;
        while ((accepted = (SocketChannel)this._acceptedq.getNonBlocking()) != null) {
            this.handleAcceptedSocket(accepted);
        }
        this.processIncomingEvents(iterStamp);
    }

    protected abstract void handleAcceptedSocket(SocketChannel var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleAcceptedSocket(SocketChannel channel, Connection conn) {
        try {
            channel.configureBlocking(false);
            conn.init(this, channel, System.currentTimeMillis());
            conn.selkey = this.register(channel, 1, conn);
            ConnectionManager connectionManager = this;
            synchronized (connectionManager) {
                ++this._stats.connects;
            }
        }
        catch (IOException ioe) {
            NaryaLog.log.info((Object)("Failure accepting new connection: " + ioe), new Object[0]);
            try {
                channel.socket().close();
            }
            catch (IOException ioe2) {
                NaryaLog.log.warning((Object)("Failed closing aborted connection: " + ioe2), new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processIncomingEvents(long iterStamp) {
        long bytesIn = 0L;
        long msgsIn = 0L;
        long eventCount = 0L;
        for (SelectionKey selkey : this._selectorSelector) {
            ++eventCount;
            NetEventHandler handler = null;
            try {
                handler = this._handlers.get(selkey);
                if (handler == null) {
                    if (!selkey.isValid()) continue;
                    try {
                        NaryaLog.log.warning((Object)"Received network event for unknown handler", new Object[]{"key", selkey, "ops", selkey.readyOps()});
                        selkey.cancel();
                    }
                    catch (CancelledKeyException cancelledKeyException) {}
                    continue;
                }
                int got = handler.handleEvent(iterStamp);
                if (got == 0) continue;
                bytesIn += (long)got;
                ++msgsIn;
            }
            catch (Exception e) {
                NaryaLog.log.warning((Object)("Error processing network data: " + handler + "."), new Object[]{e});
                if (handler == null || !(handler instanceof Connection)) continue;
                this.closeConnection((Connection)handler);
            }
        }
        Object object = this;
        synchronized (object) {
            this._stats.eventCount += eventCount;
            this._stats.bytesIn += bytesIn;
            this._stats.msgsIn += msgsIn;
        }
    }

    protected void sendOutgoingMessages(long iterStamp) {
        Tuple tup;
        if (this._oflowqs.size() > 0) {
            for (OverflowQueue oq : this._oflowqs.values().toArray(new OverflowQueue[this._oflowqs.size()])) {
                try {
                    if (!oq.writeOverflowMessages(iterStamp)) continue;
                    this._oflowqs.remove(oq.conn);
                }
                catch (IOException ioe) {
                    oq.conn.networkFailure(ioe);
                }
            }
        }
        while ((tup = (Tuple)this._outq.getNonBlocking()) != null) {
            Connection conn = (Connection)tup.left;
            OverflowQueue oqueue = this._oflowqs.get(conn);
            if (oqueue != null) {
                int size = oqueue.size();
                if (size > 500 && size % 50 == 0) {
                    NaryaLog.log.warning((Object)("Aiya, big overflow queue for " + conn + ""), new Object[]{"size", size, "bytes", ((byte[])tup.right).length});
                }
                oqueue.add(tup.right);
                continue;
            }
            this.writeMessage(conn, (byte[])tup.right, this._oflowHandler);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean writeMessage(Connection conn, byte[] data, PartialWriteHandler pwh) {
        if (conn.isClosed()) {
            return true;
        }
        if (data == ASYNC_CLOSE_REQUEST) {
            this.closeConnection(conn);
            return true;
        }
        if (data.length > 0x100000) {
            NaryaLog.log.warning((Object)"Refusing to write very large message", new Object[]{"conn", conn, "size", data.length});
            return true;
        }
        if (data.length > this._outbuf.capacity()) {
            int ncapacity = Math.max(this._outbuf.capacity() << 1, data.length);
            NaryaLog.log.info((Object)"Expanding output buffer size", new Object[]{"nsize", ncapacity});
            this._outbuf = ByteBuffer.allocateDirect(ncapacity);
        }
        boolean fully = true;
        try {
            this._outbuf.put(data);
            this._outbuf.flip();
            SocketChannel sochan = conn.getChannel();
            if (sochan.isConnectionPending()) {
                pwh.handlePartialWrite(conn, this._outbuf);
                boolean bl = false;
                return bl;
            }
            int wrote = sochan.write(this._outbuf);
            this.noteWrite(1, wrote);
            if (this._outbuf.remaining() > 0) {
                fully = false;
                pwh.handlePartialWrite(conn, this._outbuf);
            }
        }
        catch (NotYetConnectedException nyce) {
            pwh.handlePartialWrite(conn, this._outbuf);
            boolean bl = false;
            return bl;
        }
        catch (IOException ioe) {
            conn.networkFailure(ioe);
        }
        finally {
            this._outbuf.clear();
        }
        return fully;
    }

    protected synchronized void noteWrite(int msgs, int bytes) {
        this._stats.msgsOut += (long)msgs;
        this._stats.bytesOut += (long)bytes;
    }

    protected void postAsyncClose(Connection conn) {
        this._outq.append((Object)Tuple.newTuple((Object)conn, (Object)ASYNC_CLOSE_REQUEST));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connectionFailed(Connection conn, IOException ioe) {
        this._handlers.remove(conn.selkey);
        this._connections.remove(conn.getConnectionId());
        this._oflowqs.remove(conn);
        ConnectionManager connectionManager = this;
        synchronized (connectionManager) {
            ++this._stats.disconnects;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void connectionClosed(Connection conn) {
        this._handlers.remove(conn.selkey);
        this._connections.remove(conn.getConnectionId());
        this._oflowqs.remove(conn);
        ConnectionManager connectionManager = this;
        synchronized (connectionManager) {
            ++this._stats.closes;
        }
    }

    protected void handleIterateFailure(Exception e) {
        NaryaLog.log.warning((Object)"ConnectionManager.iterate() uncaught exception.", new Object[]{e});
    }

    protected void didShutdown() {
        Runnable onExit;
        this.sendOutgoingMessages(System.currentTimeMillis());
        if (this._outq.size() > 0) {
            NaryaLog.log.warning((Object)("Connection Manager failed to deliver " + this._outq.size() + " message(s)."), new Object[0]);
        }
        if ((onExit = this._onExit) != null) {
            NaryaLog.log.info((Object)"Connection Manager thread exited (running onExit).", new Object[0]);
            onExit.run();
        } else {
            NaryaLog.log.info((Object)"Connection Manager thread exited.", new Object[0]);
        }
    }

    protected class OverflowQueue
    extends ArrayList<byte[]>
    implements PartialWriteHandler {
        public Connection conn;
        protected ByteBuffer _partial;
        protected int _msgs;
        protected int _partials;

        public OverflowQueue(Connection conn, ByteBuffer message) {
            this.conn = conn;
            this.handlePartialWrite(conn, message);
        }

        public boolean writeOverflowMessages(long iterStamp) throws IOException {
            if (this._partial != null) {
                SocketChannel sochan = this.conn.getChannel();
                if (sochan == null || !sochan.isConnected() && !sochan.isConnectionPending()) {
                    throw new IOException("Connection unavailable for overflow write " + sochan);
                }
                if (sochan.isConnectionPending()) {
                    return false;
                }
                int wrote = sochan.write(this._partial);
                ConnectionManager.this.noteWrite(0, wrote);
                if (this._partial.remaining() == 0) {
                    this._partial = null;
                    ++this._partials;
                } else {
                    return false;
                }
            }
            while (this.size() > 0) {
                byte[] data = (byte[])this.remove(0);
                ++this._msgs;
                if (ConnectionManager.this.writeMessage(this.conn, data, this)) continue;
                return false;
            }
            return true;
        }

        @Override
        public void handlePartialWrite(Connection wconn, ByteBuffer buffer) {
            this._partial = ByteBuffer.allocateDirect(buffer.remaining());
            this._partial.put(buffer);
            this._partial.flip();
        }

        @Override
        public String toString() {
            return "[conn=" + this.conn + ", partials=" + this._partials + ", msgs=" + this._msgs + "]";
        }
    }

    protected static interface PartialWriteHandler {
        public void handlePartialWrite(Connection var1, ByteBuffer var2);
    }
}

