/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.DigestManager;
import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.LedgerRecoveryOp;
import org.apache.bookkeeper.client.MacDigestManager;
import org.apache.bookkeeper.client.PendingAddOp;
import org.apache.bookkeeper.client.PendingReadOp;
import org.apache.bookkeeper.client.ReadLastConfirmedOp;
import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
import org.apache.bookkeeper.client.SyncCounter;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LedgerHandle {
    static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
    static final long LAST_ADD_CONFIRMED = -1L;
    final byte[] ledgerKey;
    LedgerMetadata metadata;
    final BookKeeper bk;
    final long ledgerId;
    long lastAddPushed;
    long lastAddConfirmed;
    long length;
    final DigestManager macManager;
    final DistributionSchedule distributionSchedule;
    final Semaphore opCounterSem;
    private final Integer throttling;
    final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();

    LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata, BookKeeper.DigestType digestType, byte[] password) throws GeneralSecurityException, NumberFormatException {
        this.bk = bk;
        this.metadata = metadata;
        if (metadata.isClosed()) {
            this.lastAddConfirmed = this.lastAddPushed = metadata.close;
            this.length = metadata.length;
        } else {
            this.lastAddPushed = -1L;
            this.lastAddConfirmed = -1L;
            this.length = 0L;
        }
        this.ledgerId = ledgerId;
        this.throttling = bk.getConf().getThrottleValue();
        this.opCounterSem = new Semaphore(this.throttling);
        this.macManager = DigestManager.instantiate(ledgerId, password, digestType);
        this.ledgerKey = MacDigestManager.genDigest("ledger", password);
        this.distributionSchedule = new RoundRobinDistributionSchedule(metadata.quorumSize, metadata.ensembleSize);
    }

    public long getId() {
        return this.ledgerId;
    }

    public long getLastAddConfirmed() {
        return this.lastAddConfirmed;
    }

    public long getLastAddPushed() {
        return this.lastAddPushed;
    }

    public byte[] getLedgerKey() {
        return this.ledgerKey;
    }

    LedgerMetadata getLedgerMetadata() {
        return this.metadata;
    }

    DigestManager getDigestManager() {
        return this.macManager;
    }

    Semaphore getAvailablePermits() {
        return this.opCounterSem;
    }

    long addToLength(long delta) {
        this.length += delta;
        return this.length;
    }

    public long getLength() {
        return this.length;
    }

    DistributionSchedule getDistributionSchedule() {
        return this.distributionSchedule;
    }

    void writeLedgerConfig(AsyncCallback.StatCallback callback, Object ctx) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing metadata to ZooKeeper: " + this.ledgerId + ", " + this.metadata.getZnodeVersion());
        }
        this.bk.getZkHandle().setData(this.bk.getLedgerManager().getLedgerPath(this.ledgerId), this.metadata.serialize(), this.metadata.getZnodeVersion(), callback, ctx);
    }

    public void close() throws InterruptedException, BKException {
        SyncCounter counter = new SyncCounter();
        counter.inc();
        this.asyncClose(new SyncCloseCallback(), counter);
        counter.block(0);
        if (counter.getrc() != 0) {
            throw BKException.create(counter.getrc());
        }
    }

    public void asyncClose(AsyncCallback.CloseCallback cb, Object ctx) {
        this.asyncCloseInternal(cb, ctx, -11);
    }

    void asyncCloseInternal(final AsyncCallback.CloseCallback cb, final Object ctx, final int rc) {
        this.bk.mainWorkerPool.submitOrdered(this.ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                LedgerHandle.this.metadata.length = LedgerHandle.this.length;
                LedgerHandle.this.metadata.close(LedgerHandle.this.lastAddConfirmed);
                LedgerHandle.this.errorOutPendingAdds(rc);
                LedgerHandle.this.lastAddPushed = LedgerHandle.this.lastAddConfirmed;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing ledger: " + LedgerHandle.this.ledgerId + " at entryId: " + LedgerHandle.this.metadata.close + " with this many bytes: " + LedgerHandle.this.metadata.length);
                }
                LedgerHandle.this.writeLedgerConfig(new AsyncCallback.StatCallback(){

                    public void processResult(int rc, String path, Object subctx, Stat stat) {
                        if (rc != KeeperException.Code.OK.intValue()) {
                            LOG.warn("Conditional write failed: " + KeeperException.Code.get((int)rc));
                            cb.closeComplete(-9, LedgerHandle.this, ctx);
                        } else {
                            LedgerHandle.this.metadata.updateZnodeStatus(stat);
                            cb.closeComplete(0, LedgerHandle.this, ctx);
                        }
                    }
                }, null);
            }
        });
    }

    public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException {
        SyncCounter counter = new SyncCounter();
        counter.inc();
        this.asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(), counter);
        counter.block(0);
        if (counter.getrc() != 0) {
            throw BKException.create(counter.getrc());
        }
        return counter.getSequence();
    }

    public void asyncReadEntries(long firstEntry, long lastEntry, AsyncCallback.ReadCallback cb, Object ctx) {
        if (firstEntry < 0L || lastEntry > this.lastAddConfirmed || firstEntry > lastEntry) {
            cb.readComplete(-1, this, null, ctx);
            return;
        }
        try {
            new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
        }
        catch (InterruptedException e) {
            cb.readComplete(-15, this, null, ctx);
        }
    }

    public void addEntry(byte[] data) throws InterruptedException, BKException {
        this.addEntry(data, 0, data.length);
    }

    public void addEntry(byte[] data, int offset, int length) throws InterruptedException, BKException {
        LOG.debug("Adding entry " + data);
        SyncCounter counter = new SyncCounter();
        counter.inc();
        this.asyncAddEntry(data, offset, length, new SyncAddCallback(), counter);
        counter.block(0);
        if (counter.getrc() != 0) {
            throw BKException.create(counter.getrc());
        }
        if (counter.getrc() != 0) {
            throw BKException.create(counter.getrc());
        }
    }

    public void asyncAddEntry(byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
        this.asyncAddEntry(data, 0, data.length, cb, ctx);
    }

    public void asyncAddEntry(byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) {
        PendingAddOp op = new PendingAddOp(this, cb, ctx);
        this.doAsyncAddEntry(op, data, offset, length, cb, ctx);
    }

    void asyncRecoveryAddEntry(byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) {
        PendingAddOp op = new PendingAddOp(this, cb, ctx).enableRecoveryAdd();
        this.doAsyncAddEntry(op, data, offset, length, cb, ctx);
    }

    private void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length, final AsyncCallback.AddCallback cb, final Object ctx) {
        if (offset < 0 || length < 0 || offset + length > data.length) {
            throw new ArrayIndexOutOfBoundsException("Invalid values for offset(" + offset + ") or length(" + length + ")");
        }
        try {
            this.opCounterSem.acquire();
        }
        catch (InterruptedException e) {
            cb.addComplete(-15, this, -1L, ctx);
        }
        try {
            this.bk.mainWorkerPool.submitOrdered(this.ledgerId, new SafeRunnable(){

                @Override
                public void safeRun() {
                    if (LedgerHandle.this.metadata.isClosed()) {
                        LOG.warn("Attempt to add to closed ledger: " + LedgerHandle.this.ledgerId);
                        LedgerHandle.this.opCounterSem.release();
                        cb.addComplete(-11, LedgerHandle.this, -1L, ctx);
                        return;
                    }
                    long entryId = ++LedgerHandle.this.lastAddPushed;
                    long currentLength = LedgerHandle.this.addToLength(length);
                    op.setEntryId(entryId);
                    LedgerHandle.this.pendingAddOps.add(op);
                    ChannelBuffer toSend = LedgerHandle.this.macManager.computeDigestAndPackageForSending(entryId, LedgerHandle.this.lastAddConfirmed, currentLength, data, offset, length);
                    op.initiate(toSend);
                }
            });
        }
        catch (RuntimeException e) {
            this.opCounterSem.release();
            throw e;
        }
    }

    public void asyncReadLastConfirmed(AsyncCallback.ReadLastConfirmedCallback cb, Object ctx) {
        new ReadLastConfirmedOp(this, cb, ctx).initiate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long readLastConfirmed() throws InterruptedException, BKException {
        LastConfirmedCtx ctx = new LastConfirmedCtx();
        this.asyncReadLastConfirmed(new SyncReadLastConfirmedCallback(), ctx);
        LastConfirmedCtx lastConfirmedCtx = ctx;
        synchronized (lastConfirmedCtx) {
            while (!ctx.ready()) {
                ctx.wait();
            }
        }
        if (ctx.getRC() != 0) {
            throw BKException.create(ctx.getRC());
        }
        return ctx.getlastConfirmed();
    }

    void handleUnrecoverableErrorDuringAdd(int rc) {
        this.asyncCloseInternal(NoopCloseCallback.instance, null, rc);
    }

    void errorOutPendingAdds(int rc) {
        PendingAddOp pendingAddOp;
        while ((pendingAddOp = this.pendingAddOps.poll()) != null) {
            pendingAddOp.submitCallback(rc);
        }
    }

    void sendAddSuccessCallbacks() {
        PendingAddOp pendingAddOp;
        while ((pendingAddOp = this.pendingAddOps.peek()) != null) {
            if (pendingAddOp.numResponsesPending != 0) {
                return;
            }
            this.pendingAddOps.remove();
            this.lastAddConfirmed = pendingAddOp.entryId;
            pendingAddOp.submitCallback(0);
        }
    }

    void handleBookieFailure(InetSocketAddress addr, final int bookieIndex) {
        InetSocketAddress newBookie;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling failure of bookie: " + addr + " index: " + bookieIndex);
        }
        try {
            newBookie = this.bk.bookieWatcher.getAdditionalBookie(this.metadata.currentEnsemble);
        }
        catch (BKException.BKNotEnoughBookiesException e) {
            LOG.error("Could not get additional bookie to remake ensemble, closing ledger: " + this.ledgerId);
            this.handleUnrecoverableErrorDuringAdd(e.getCode());
            return;
        }
        final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(this.metadata.currentEnsemble);
        newEnsemble.set(bookieIndex, newBookie);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Changing ensemble from: " + this.metadata.currentEnsemble + " to: " + newEnsemble + " for ledger: " + this.ledgerId + " starting at entry: " + (this.lastAddConfirmed + 1L));
        }
        this.metadata.addEnsemble(this.lastAddConfirmed + 1L, newEnsemble);
        this.writeLedgerConfig(new AsyncCallback.StatCallback(){

            public void processResult(final int rc, String path, Object ctx, final Stat stat) {
                LedgerHandle.this.bk.mainWorkerPool.submitOrdered(LedgerHandle.this.ledgerId, new SafeRunnable(){

                    @Override
                    public void safeRun() {
                        if (rc != KeeperException.Code.OK.intValue()) {
                            LOG.error("Could not persist ledger metadata while changing ensemble to: " + newEnsemble + " , closing ledger");
                            LedgerHandle.this.handleUnrecoverableErrorDuringAdd(-9);
                            return;
                        }
                        LedgerHandle.this.metadata.updateZnodeStatus(stat);
                        for (PendingAddOp pendingAddOp : LedgerHandle.this.pendingAddOps) {
                            pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
                        }
                    }
                });
            }
        }, null);
    }

    void rereadMetadata(final BookkeeperInternalCallbacks.GenericCallback<Void> cb) {
        this.bk.getZkHandle().getData(this.bk.getLedgerManager().getLedgerPath(this.ledgerId), false, new AsyncCallback.DataCallback(){

            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                if (rc != KeeperException.Code.OK.intValue()) {
                    LOG.error("Error reading metadata from ledger, code =" + rc);
                    cb.operationComplete(-9, null);
                    return;
                }
                try {
                    LedgerHandle.this.metadata = LedgerMetadata.parseConfig(data, stat.getVersion());
                }
                catch (IOException e) {
                    LOG.error("Error parsing ledger metadata for ledger", (Throwable)e);
                    cb.operationComplete(-9, null);
                }
                cb.operationComplete(0, null);
            }
        }, null);
    }

    void recover(final BookkeeperInternalCallbacks.GenericCallback<Void> cb) {
        if (this.metadata.isClosed()) {
            this.lastAddConfirmed = this.lastAddPushed = this.metadata.close;
            this.length = this.metadata.length;
            cb.operationComplete(0, null);
            return;
        }
        this.metadata.markLedgerInRecovery();
        this.writeLedgerConfig(new AsyncCallback.StatCallback(){

            public void processResult(int rc, String path, Object ctx, Stat stat) {
                if (rc == -103) {
                    LedgerHandle.this.rereadMetadata(new BookkeeperInternalCallbacks.GenericCallback<Void>(){

                        @Override
                        public void operationComplete(int rc, Void result) {
                            if (rc != 0) {
                                cb.operationComplete(rc, null);
                            } else {
                                LedgerHandle.this.recover(cb);
                            }
                        }
                    });
                } else if (rc == KeeperException.Code.OK.intValue()) {
                    LedgerHandle.this.metadata.znodeVersion = stat.getVersion();
                    new LedgerRecoveryOp(LedgerHandle.this, cb).initiate();
                } else {
                    LOG.error("Error writing ledger config " + rc + " path = " + path);
                    cb.operationComplete(-9, null);
                }
            }
        }, null);
    }

    private static class SyncCloseCallback
    implements AsyncCallback.CloseCallback {
        private SyncCloseCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
            SyncCounter counter = (SyncCounter)ctx;
            counter.setrc(rc);
            SyncCounter syncCounter = counter;
            synchronized (syncCounter) {
                counter.dec();
                counter.notify();
            }
        }
    }

    private static class SyncReadLastConfirmedCallback
    implements AsyncCallback.ReadLastConfirmedCallback {
        private SyncReadLastConfirmedCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
            LastConfirmedCtx lcCtx;
            LastConfirmedCtx lastConfirmedCtx = lcCtx = (LastConfirmedCtx)ctx;
            synchronized (lastConfirmedCtx) {
                lcCtx.setRC(rc);
                lcCtx.setLastConfirmed(lastConfirmed);
                lcCtx.notify();
            }
        }
    }

    private static class SyncAddCallback
    implements AsyncCallback.AddCallback {
        private SyncAddCallback() {
        }

        @Override
        public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
            SyncCounter counter = (SyncCounter)ctx;
            counter.setrc(rc);
            counter.dec();
        }
    }

    private static class SyncReadCallback
    implements AsyncCallback.ReadCallback {
        private SyncReadCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
            SyncCounter counter;
            SyncCounter syncCounter = counter = (SyncCounter)ctx;
            synchronized (syncCounter) {
                counter.setSequence(seq);
                counter.setrc(rc);
                counter.dec();
                counter.notify();
            }
        }
    }

    static class NoopCloseCallback
    implements AsyncCallback.CloseCallback {
        static NoopCloseCallback instance = new NoopCloseCallback();

        NoopCloseCallback() {
        }

        @Override
        public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
            if (rc != KeeperException.Code.OK.intValue()) {
                LOG.warn("Close failed: " + BKException.getMessage(rc));
            }
        }
    }

    class LastConfirmedCtx {
        long response = -1L;
        int rc;

        LastConfirmedCtx() {
        }

        void setLastConfirmed(long lastConfirmed) {
            this.response = lastConfirmed;
        }

        long getlastConfirmed() {
            return this.response;
        }

        void setRC(int rc) {
            this.rc = rc;
        }

        int getRC() {
            return this.rc;
        }

        boolean ready() {
            return this.response != -1L;
        }
    }
}

