/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.PerChannelBookieClient;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieClient {
    static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
    AtomicLong totalBytesOutstanding = new AtomicLong();
    OrderedSafeExecutor executor;
    ClientSocketChannelFactory channelFactory;
    ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient> channels = new ConcurrentHashMap();
    private final ClientConfiguration conf;

    public BookieClient(ClientConfiguration conf, ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
        this.conf = conf;
        this.channelFactory = channelFactory;
        this.executor = executor;
    }

    public PerChannelBookieClient lookupClient(InetSocketAddress addr) {
        PerChannelBookieClient prevChannel;
        PerChannelBookieClient channel = this.channels.get(addr);
        if (channel == null && (prevChannel = this.channels.putIfAbsent(addr, channel = new PerChannelBookieClient(this.executor, this.channelFactory, addr, this.totalBytesOutstanding))) != null) {
            channel = prevChannel;
        }
        return channel;
    }

    public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId, final ChannelBuffer toSend, final BookkeeperInternalCallbacks.WriteCallback cb, final Object ctx, final int options) {
        final PerChannelBookieClient client = this.lookupClient(addr);
        client.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            @Override
            public void operationComplete(int rc, Void result) {
                if (rc != 0) {
                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
                    return;
                }
                client.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx, options);
            }
        });
    }

    public void readEntry(InetSocketAddress addr, final long ledgerId, final long entryId, final BookkeeperInternalCallbacks.ReadEntryCallback cb, final Object ctx, final int options) {
        final PerChannelBookieClient client = this.lookupClient(addr);
        client.connectIfNeededAndDoOp(new BookkeeperInternalCallbacks.GenericCallback<Void>(){

            @Override
            public void operationComplete(int rc, Void result) {
                if (rc != 0) {
                    cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
                    return;
                }
                client.readEntry(ledgerId, entryId, cb, ctx, options);
            }
        });
    }

    public void close() {
        for (PerChannelBookieClient channel : this.channels.values()) {
            channel.close();
        }
    }

    public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
        if (args.length != 3) {
            System.err.println("USAGE: BookieClient bookieHost port ledger#");
            return;
        }
        BookkeeperInternalCallbacks.WriteCallback cb = new BookkeeperInternalCallbacks.WriteCallback(){

            @Override
            public void writeComplete(int rc, long ledger, long entry, InetSocketAddress addr, Object ctx) {
                Counter counter = (Counter)ctx;
                counter.dec();
                if (rc != 0) {
                    System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
                }
            }
        };
        Counter counter = new Counter();
        byte[] hello = "hello".getBytes();
        long ledger = Long.parseLong(args[2]);
        NioClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool());
        OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
        BookieClient bc = new BookieClient(new ClientConfiguration(), (ClientSocketChannelFactory)channelFactory, executor);
        InetSocketAddress addr = new InetSocketAddress(args[0], Integer.parseInt(args[1]));
        for (int i = 0; i < 100000; ++i) {
            counter.inc();
            bc.addEntry(addr, ledger, new byte[0], i, ChannelBuffers.wrappedBuffer((byte[])hello), cb, counter, 0);
        }
        counter.wait(0);
        System.out.println("Total = " + counter.total());
        channelFactory.releaseExternalResources();
        executor.shutdown();
    }

    private static class Counter {
        int i;
        int total;

        private Counter() {
        }

        synchronized void inc() {
            ++this.i;
            ++this.total;
        }

        synchronized void dec() {
            --this.i;
            this.notifyAll();
        }

        synchronized void wait(int limit) throws InterruptedException {
            while (this.i > limit) {
                this.wait();
            }
        }

        synchronized int total() {
            return this.total;
        }
    }
}

