/*
 * Decompiled with CFR 0.152.
 */
package com.meidusa.toolkit.net;

import com.meidusa.toolkit.net.AbstractConnection;
import com.meidusa.toolkit.net.Connection;
import com.meidusa.toolkit.net.ConnectionObserver;
import com.meidusa.toolkit.net.FrontendConnection;
import com.meidusa.toolkit.net.buffer.BufferPool;
import com.meidusa.toolkit.net.util.LoopingThread;
import com.meidusa.toolkit.util.ExecutorUtil;
import com.meidusa.toolkit.util.NameableExecutor;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionManager
extends LoopingThread {
    protected static final int CONNECTION_ESTABLISHED = 0;
    protected static final int CONNECTION_FAILED = 1;
    protected static final int CONNECTION_CLOSED = 2;
    protected static final int CONNECTION_AUTHENTICATE_SUCCESS = 3;
    protected static final int CONNECTION_AUTHENTICATE_FAILD = 4;
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
    private static final Logger REPORT = LoggerFactory.getLogger((String)"venus.report.queue");
    private static final List<ConnectionManager> MANAGERS = new ArrayList<ConnectionManager>();
    protected List<ConnectionObserver> _observers = new ArrayList<ConnectionObserver>();
    private final Selector selector;
    private final BlockingQueue<Connection> registerQueue;
    private final ConcurrentMap<Long, AbstractConnection> frontends;
    private final ConcurrentMap<Long, AbstractConnection> backends;
    private final NameableExecutor executor;
    private final BufferPool bufferPool;
    private long netInBytes;
    private long netOutBytes;

    public ConnectionManager(String name, int executorSize) throws IOException {
        this.setName(name);
        this.bufferPool = new BufferPool(0x1000000, 8192);
        this.selector = Selector.open();
        this.registerQueue = new LinkedBlockingQueue<Connection>();
        this.frontends = new ConcurrentHashMap<Long, AbstractConnection>();
        this.backends = new ConcurrentHashMap<Long, AbstractConnection>();
        this.executor = executorSize > 0 ? ExecutorUtil.create(name + "-Eexecutor", executorSize) : null;
        MANAGERS.add(this);
    }

    @Override
    protected void willStart() {
        super.willStart();
    }

    public long getNetInBytes() {
        return this.netInBytes;
    }

    public void addNetInBytes(long bytes) {
        this.netInBytes += bytes;
    }

    public long getNetOutBytes() {
        return this.netOutBytes;
    }

    public void addNetOutBytes(long bytes) {
        this.netOutBytes += bytes;
    }

    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    final void postRegister(Connection c) {
        this.registerQueue.offer(c);
        this.selector.wakeup();
    }

    final BlockingQueue<Connection> getRegisterQueue() {
        return this.registerQueue;
    }

    public NameableExecutor getExecutor() {
        return this.executor;
    }

    public void addConnection(AbstractConnection c) {
        if (c instanceof FrontendConnection) {
            this.frontends.put(c.getId(), c);
        } else {
            this.backends.put(c.getId(), c);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void iterate() throws IOException {
        this.selector.select(1000L);
        this.register(this.selector);
        Set<SelectionKey> keys = this.selector.selectedKeys();
        try {
            for (SelectionKey key : keys) {
                Object att = key.attachment();
                if (att != null && key.isValid()) {
                    int readyOps = key.readyOps();
                    if ((readyOps & 1) != 0) {
                        this.read((Connection)att);
                        continue;
                    }
                    if ((readyOps & 4) != 0) {
                        this.write((Connection)att);
                        continue;
                    }
                    key.cancel();
                    continue;
                }
                key.cancel();
            }
        }
        finally {
            keys.clear();
        }
    }

    @Override
    protected void handleIterateFailure(Throwable e) {
        if (e instanceof ClosedSelectorException) {
            super.handleIterateFailure(e);
        } else if (e instanceof InterruptedException) {
            super.handleIterateFailure(e);
        } else {
            LOGGER.error(this.getName(), e);
        }
    }

    private void register(Selector selector) {
        Connection c = null;
        while ((c = (Connection)this.registerQueue.poll()) != null) {
            try {
                c.register(selector);
                if (c.isClosed()) continue;
                this.notifyObservers(0, c, null);
            }
            catch (Throwable e) {
                c.handleError(16003004, e);
            }
        }
    }

    private void read(Connection c) {
        try {
            c.read();
        }
        catch (Throwable e) {
            c.handleError(16003005, e);
        }
    }

    private void write(Connection c) {
        try {
            c.writeByEvent();
        }
        catch (Throwable e) {
            c.handleError(16003007, e);
        }
    }

    public int getFrontends() {
        return this.frontends.size();
    }

    public int getBackends() {
        return this.backends.size();
    }

    private void connectionsCheck() {
        AbstractConnection c;
        Iterator it = this.frontends.entrySet().iterator();
        while (it.hasNext()) {
            c = (AbstractConnection)it.next().getValue();
            if (c == null) {
                it.remove();
                continue;
            }
            if (c.isClosed()) {
                it.remove();
                c.cleanup();
                continue;
            }
            try {
                c.idleCheck();
            }
            catch (CancelledKeyException e) {}
        }
        it = this.backends.entrySet().iterator();
        while (it.hasNext()) {
            c = (AbstractConnection)it.next().getValue();
            if (c == null) {
                it.remove();
                continue;
            }
            if (c.isClosed()) {
                it.remove();
                c.cleanup();
                continue;
            }
            try {
                c.idleCheck();
            }
            catch (CancelledKeyException cancelledKeyException) {}
        }
    }

    public void addConnectionObserver(ConnectionObserver observer) {
        this._observers.add(observer);
    }

    public void removeConnectionObserver(ConnectionObserver observer) {
        this._observers.remove(observer);
    }

    protected void notifyObservers(int code, Connection conn, Object arg1) {
        block5: for (ConnectionObserver obs : this._observers) {
            switch (code) {
                case 0: {
                    obs.connectionEstablished(conn);
                    continue block5;
                }
                case 1: {
                    obs.connectionFailed(conn, (Exception)arg1);
                    continue block5;
                }
                case 2: {
                    obs.connectionClosed(conn);
                    continue block5;
                }
            }
            throw new RuntimeException("Invalid code supplied to notifyObservers: " + code);
        }
    }

    static {
        new IdleChecker().start();
    }

    private static class IdleChecker
    extends LoopingThread {
        IdleChecker() {
            this.setName("Connection Idle Checker");
            this.setDaemon(true);
        }

        @Override
        public void iterate() {
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            int queueSize = 0;
            for (ConnectionManager manager : MANAGERS) {
                manager.connectionsCheck();
                if (manager.getExecutor() == null) continue;
                queueSize = manager.getExecutor().getQueue().size();
            }
            if (queueSize > 100) {
                if (REPORT.isInfoEnabled()) {
                    REPORT.info("server total queue=" + queueSize);
                }
            } else if (REPORT.isDebugEnabled()) {
                REPORT.debug("server total queue=" + queueSize);
            }
        }

        @Override
        protected void handleIterateFailure(Throwable e) {
            LOGGER.warn("Idle checker running exception", e);
        }
    }
}

