/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.Queue;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PendingReadOp
implements Enumeration<LedgerEntry>,
BookkeeperInternalCallbacks.ReadEntryCallback {
    Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
    Queue<LedgerEntry> seq;
    AsyncCallback.ReadCallback cb;
    Object ctx;
    LedgerHandle lh;
    long numPendingReads;
    long startEntryId;
    long endEntryId;

    PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, AsyncCallback.ReadCallback cb, Object ctx) {
        this.seq = new ArrayDeque<LedgerEntry>((int)(endEntryId - startEntryId));
        this.cb = cb;
        this.ctx = ctx;
        this.lh = lh;
        this.startEntryId = startEntryId;
        this.endEntryId = endEntryId;
        this.numPendingReads = endEntryId - startEntryId + 1L;
    }

    public void initiate() throws InterruptedException {
        long nextEnsembleChange = this.startEntryId;
        long i = this.startEntryId;
        ArrayList<InetSocketAddress> ensemble = null;
        do {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Acquiring lock: " + i);
            }
            this.lh.opCounterSem.acquire();
            if (i == nextEnsembleChange) {
                ensemble = this.lh.metadata.getEnsemble(i);
                nextEnsembleChange = this.lh.metadata.getNextEnsembleChange(i);
            }
            LedgerEntry entry = new LedgerEntry(this.lh.ledgerId, i);
            this.seq.add(entry);
            this.sendRead(ensemble, entry, -1);
        } while (++i <= this.endEntryId);
    }

    void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int lastErrorCode) {
        if (entry.nextReplicaIndexToReadFrom >= this.lh.metadata.quorumSize) {
            this.submitCallback(lastErrorCode);
            return;
        }
        int bookieIndex = this.lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom);
        ++entry.nextReplicaIndexToReadFrom;
        this.lh.bk.bookieClient.readEntry(ensemble.get(bookieIndex), this.lh.ledgerId, entry.entryId, this, entry, 0);
    }

    void logErrorAndReattemptRead(LedgerEntry entry, String errMsg, int rc) {
        ArrayList<InetSocketAddress> ensemble = this.lh.metadata.getEnsemble(entry.entryId);
        int bookeIndex = this.lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom - 1);
        this.LOG.error(errMsg + " while reading entry: " + entry.entryId + " ledgerId: " + this.lh.ledgerId + " from bookie: " + ensemble.get(bookeIndex));
        this.sendRead(ensemble, entry, rc);
    }

    @Override
    public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) {
        ChannelBufferInputStream is;
        LedgerEntry entry = (LedgerEntry)ctx;
        if (rc != 0) {
            this.logErrorAndReattemptRead(entry, "Error: " + BKException.getMessage(rc), rc);
            return;
        }
        try {
            is = this.lh.macManager.verifyDigestAndReturnData(entryId, buffer);
        }
        catch (BKException.BKDigestMatchException e) {
            this.logErrorAndReattemptRead(entry, "Mac mismatch", -5);
            return;
        }
        entry.entryDataStream = is;
        entry.length = buffer.getLong(24);
        --this.numPendingReads;
        if (this.numPendingReads == 0L) {
            this.submitCallback(0);
        }
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("Releasing lock: " + entryId);
        }
        this.lh.opCounterSem.release();
        if (this.numPendingReads < 0L) {
            this.LOG.error("Read too many values");
        }
    }

    private void submitCallback(int code) {
        this.cb.readComplete(code, this.lh, this, this.ctx);
    }

    @Override
    public boolean hasMoreElements() {
        return !this.seq.isEmpty();
    }

    @Override
    public LedgerEntry nextElement() throws NoSuchElementException {
        return this.seq.remove();
    }

    public int size() {
        return this.seq.size();
    }
}

