/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.MessageAccessor;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.FetchManager;
import com.taobao.metamorphosis.client.consumer.FetchRequest;
import com.taobao.metamorphosis.client.consumer.FetchRequestQueue;
import com.taobao.metamorphosis.client.consumer.InnerConsumer;
import com.taobao.metamorphosis.client.consumer.MessageIterator;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SimpleFetchManager
implements FetchManager {
    private volatile boolean shutdown = false;
    private Thread[] fetchRunners;
    private int fetchRequestCount;
    private FetchRequestQueue requestQueue;
    private final ConsumerConfig consumerConfig;
    private final InnerConsumer consumer;
    static final Log log = LogFactory.getLog(SimpleFetchManager.class);

    public SimpleFetchManager(ConsumerConfig consumerConfig, InnerConsumer consumer) {
        this.consumerConfig = consumerConfig;
        this.consumer = consumer;
    }

    @Override
    public boolean isShutdown() {
        return this.shutdown;
    }

    @Override
    public void stopFetchRunner() throws InterruptedException {
        this.shutdown = true;
        if (this.fetchRunners != null) {
            for (Thread thread : this.fetchRunners) {
                if (thread == null) continue;
                thread.interrupt();
                try {
                    thread.join(5000L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (this.requestQueue != null) {
            while (this.requestQueue.size() != this.fetchRequestCount) {
                Thread.sleep(50L);
            }
        }
    }

    @Override
    public void resetFetchState() {
        this.requestQueue = new FetchRequestQueue();
        this.fetchRunners = new Thread[this.consumerConfig.getFetchRunnerCount()];
        for (int i = 0; i < this.fetchRunners.length; ++i) {
            this.fetchRunners[i] = new Thread(new FetchRequestRunner());
            this.fetchRunners[i].setName(this.consumerConfig.getGroup() + "Fetch-Runner-" + i);
        }
    }

    @Override
    public void startFetchRunner() {
        this.fetchRequestCount = this.requestQueue.size();
        this.shutdown = false;
        for (Thread thread : this.fetchRunners) {
            thread.start();
        }
    }

    @Override
    public void addFetchRequest(FetchRequest request) {
        this.requestQueue.offer(request);
    }

    FetchRequest takeFetchRequest() throws InterruptedException {
        return this.requestQueue.take();
    }

    boolean isRetryTooMany(FetchRequest request) {
        return request.getRetries() > this.consumerConfig.getMaxFetchRetries();
    }

    boolean isRetryTooManyForIncrease(FetchRequest request) {
        return request.getRetries() > this.consumerConfig.getMaxIncreaseFetchDataRetries();
    }

    long getMaxDelayFetchTimeInMills() {
        return this.consumerConfig.getMaxDelayFetchTimeInMills();
    }

    class FetchRequestRunner
    implements Runnable {
        private static final int DELAY_NPARTS = 10;
        private long lastLogNoConnectionTime;

        FetchRequestRunner() {
        }

        @Override
        public void run() {
            while (!SimpleFetchManager.this.shutdown) {
                try {
                    FetchRequest request = SimpleFetchManager.this.requestQueue.take();
                    this.processRequest(request);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        void processRequest(FetchRequest request) {
            try {
                MessageIterator iterator = SimpleFetchManager.this.consumer.fetch(request, -1L, null);
                MessageListener listener = SimpleFetchManager.this.consumer.getMessageListener(request.getTopic());
                this.notifyListener(request, iterator, listener);
            }
            catch (MetaClientException e) {
                this.updateDelay(request);
                this.LogAddRequest(request, e);
            }
            catch (InterruptedException e) {
                SimpleFetchManager.this.addFetchRequest(request);
            }
            catch (Throwable e) {
                this.updateDelay(request);
                this.LogAddRequest(request, e);
            }
        }

        private void LogAddRequest(FetchRequest request, Throwable e) {
            if (e instanceof MetaClientException && e.getCause() instanceof NotifyRemotingException && e.getMessage().contains("\u65e0\u53ef\u7528\u8fde\u63a5")) {
                long now = System.currentTimeMillis();
                if (this.lastLogNoConnectionTime <= 0L || now - this.lastLogNoConnectionTime > 30000L) {
                    log.error((Object)("\u83b7\u53d6\u6d88\u606f\u5931\u8d25,topic=" + request.getTopic() + ",partition=" + request.getPartition()), e);
                    this.lastLogNoConnectionTime = now;
                }
            } else {
                log.error((Object)("\u83b7\u53d6\u6d88\u606f\u5931\u8d25,topic=" + request.getTopic() + ",partition=" + request.getPartition()), e);
            }
            SimpleFetchManager.this.addFetchRequest(request);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void getOffsetAddRequest(FetchRequest request, InvalidMessageException e) {
            try {
                long newOffset = SimpleFetchManager.this.consumer.offset(request);
                request.resetRetries();
                request.setOffset(newOffset, request.getLastMessageId(), request.getPartitionObject().isAutoAck());
            }
            catch (MetaClientException ex) {
                log.error((Object)("\u67e5\u8be2offset\u5931\u8d25,topic=" + request.getTopic() + ",partition=" + request.getPartition()), (Throwable)e);
            }
            finally {
                SimpleFetchManager.this.addFetchRequest(request);
            }
        }

        private void notifyListener(final FetchRequest request, final MessageIterator it, final MessageListener listener) {
            if (listener != null) {
                if (listener.getExecutor() != null) {
                    try {
                        listener.getExecutor().execute(new Runnable(){

                            @Override
                            public void run() {
                                FetchRequestRunner.this.receiveMessages(request, it, listener);
                            }
                        });
                    }
                    catch (RejectedExecutionException e) {
                        log.error((Object)("MessageListener\u7ebf\u7a0b\u6c60\u7e41\u5fd9\uff0c\u65e0\u6cd5\u5904\u7406\u6d88\u606f,topic=" + request.getTopic() + ",partition=" + request.getPartition()), (Throwable)e);
                        SimpleFetchManager.this.addFetchRequest(request);
                    }
                } else {
                    this.receiveMessages(request, it, listener);
                }
            }
        }

        private void receiveMessages(FetchRequest request, MessageIterator it, MessageListener listener) {
            if (it != null && it.hasNext()) {
                if (this.processWhenRetryTooMany(request, it)) {
                    return;
                }
                Partition partition = request.getPartitionObject();
                if (this.processReceiveMessage(request, it, listener, partition)) {
                    return;
                }
                this.postReceiveMessage(request, it, partition);
            } else {
                if (SimpleFetchManager.this.isRetryTooManyForIncrease(request) && it != null && it.getDataLength() > 0) {
                    request.increaseMaxSize();
                    log.warn((Object)("\u8b66\u544a\uff0c\u7b2c" + request.getRetries() + "\u6b21\u65e0\u6cd5\u62c9\u53d6topic=" + request.getTopic() + ",partition=" + request.getPartitionObject() + "\u7684\u6d88\u606f\uff0c\u9012\u589emaxSize=" + request.getMaxSize() + " Bytes"));
                }
                if (it != null) {
                    request.incrementRetriesAndGet();
                }
                this.updateDelay(request);
                SimpleFetchManager.this.addFetchRequest(request);
            }
        }

        private boolean processReceiveMessage(FetchRequest request, MessageIterator it, MessageListener listener, Partition partition) {
            int count = 0;
            while (it.hasNext()) {
                int prevOffset = it.getOffset();
                try {
                    Message msg = it.next();
                    MessageAccessor.setPartition((Message)msg, (Partition)partition);
                    listener.recieveMessages(msg);
                    if (partition.isAutoAck()) {
                        ++count;
                        continue;
                    }
                    if (partition.isAcked()) {
                        ++count;
                        break;
                    }
                    if (partition.isRollback()) break;
                    ++count;
                }
                catch (InvalidMessageException e) {
                    MetaStatLog.addStat(null, (String)"cli_invalid_message", (String)request.getTopic());
                    this.getOffsetAddRequest(request, e);
                    return true;
                }
                catch (Throwable e) {
                    it.setOffset(prevOffset);
                    log.error((Object)("MessageListener\u5904\u7406\u6d88\u606f\u5f02\u5e38,topic=" + request.getTopic() + ",partition=" + request.getPartition()), e);
                    break;
                }
            }
            MetaStatLog.addStatValue2(null, (String)"cli_get_msg_count", (String)request.getTopic(), (long)count);
            return false;
        }

        private boolean processWhenRetryTooMany(FetchRequest request, MessageIterator it) {
            if (SimpleFetchManager.this.isRetryTooMany(request)) {
                try {
                    Message couldNotProecssMsg = it.next();
                    MessageAccessor.setPartition((Message)couldNotProecssMsg, (Partition)request.getPartitionObject());
                    MetaStatLog.addStat(null, (String)"cli_skip_msg_count", (String)couldNotProecssMsg.getTopic());
                    SimpleFetchManager.this.consumer.appendCouldNotProcessMessage(couldNotProecssMsg);
                }
                catch (InvalidMessageException e) {
                    MetaStatLog.addStat(null, (String)"cli_invalid_message", (String)request.getTopic());
                    this.getOffsetAddRequest(request, e);
                    return true;
                }
                catch (Throwable t) {
                    this.LogAddRequest(request, t);
                    return true;
                }
                request.resetRetries();
                request.setOffset(request.getOffset() + (long)it.getOffset(), it.getPrevMessage().getId(), true);
                request.setDelay(0L);
                SimpleFetchManager.this.addFetchRequest(request);
                return true;
            }
            return false;
        }

        private void postReceiveMessage(FetchRequest request, MessageIterator it, Partition partition) {
            if (it.getOffset() == 0) {
                request.incrementRetriesAndGet();
            } else {
                request.resetRetries();
            }
            if (!partition.isAutoAck()) {
                if (partition.isRollback()) {
                    request.rollbackOffset();
                    partition.reset();
                    this.addRequst(request);
                } else if (partition.isAcked()) {
                    partition.reset();
                    this.ackRequest(request, it, true);
                } else {
                    this.ackRequest(request, it, false);
                }
            } else {
                this.ackRequest(request, it, true);
            }
        }

        private void ackRequest(FetchRequest request, MessageIterator it, boolean ack) {
            request.setOffset(request.getOffset() + (long)it.getOffset(), it.getPrevMessage() != null ? it.getPrevMessage().getId() : -1L, ack);
            this.addRequst(request);
        }

        private void addRequst(FetchRequest request) {
            long delay = this.getRetryDelay(request);
            request.setDelay(delay);
            SimpleFetchManager.this.addFetchRequest(request);
        }

        private long getRetryDelay(FetchRequest request) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long nPartsDelayTime = maxDelayFetchTimeInMills / 10L;
            long delay = nPartsDelayTime * (long)request.getRetries();
            if (delay > maxDelayFetchTimeInMills) {
                delay = maxDelayFetchTimeInMills;
            }
            return delay;
        }

        private void updateDelay(FetchRequest request) {
            long delay = this.getNextDelay(request);
            request.setDelay(delay);
        }

        private long getNextDelay(FetchRequest request) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long nPartsDelayTime = maxDelayFetchTimeInMills / 10L;
            long delay = request.getDelay() + nPartsDelayTime;
            if (delay > maxDelayFetchTimeInMills) {
                delay = maxDelayFetchTimeInMills;
            }
            return delay;
        }
    }
}

