/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.core.command.RequestCommand;
import com.taobao.gecko.core.command.ResponseCommand;
import com.taobao.gecko.core.util.ConcurrentHashSet;
import com.taobao.gecko.core.util.OpaqueGenerator;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.ConsumerZooKeeper;
import com.taobao.metamorphosis.client.consumer.FetchManager;
import com.taobao.metamorphosis.client.consumer.FetchRequest;
import com.taobao.metamorphosis.client.consumer.InnerConsumer;
import com.taobao.metamorphosis.client.consumer.LoadBalanceStrategy;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageIterator;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.client.consumer.RecoverManager;
import com.taobao.metamorphosis.client.consumer.SimpleFetchManager;
import com.taobao.metamorphosis.client.consumer.SubscribeInfoManager;
import com.taobao.metamorphosis.client.consumer.SubscriberInfo;
import com.taobao.metamorphosis.client.consumer.Subscription;
import com.taobao.metamorphosis.client.consumer.TopicPartitionRegInfo;
import com.taobao.metamorphosis.client.consumer.storage.OffsetStorage;
import com.taobao.metamorphosis.client.producer.ProducerZooKeeper;
import com.taobao.metamorphosis.cluster.Broker;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.exception.MetaOpeartionTimeoutException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.DataCommand;
import com.taobao.metamorphosis.network.GetCommand;
import com.taobao.metamorphosis.network.OffsetCommand;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SimpleMessageConsumer
implements MessageConsumer,
InnerConsumer {
    private static final int DEFAULT_OP_TIMEOUT = 10000;
    static final Log log = LogFactory.getLog(SimpleFetchManager.FetchRequestRunner.class);
    private final RemotingClientWrapper remotingClient;
    private final ConsumerConfig consumerConfig;
    private final ConsumerZooKeeper consumerZooKeeper;
    private final MetaMessageSessionFactory messageSessionFactory;
    private final OffsetStorage offsetStorage;
    private final LoadBalanceStrategy loadBalanceStrategy;
    private final ProducerZooKeeper producerZooKeeper;
    private final ScheduledExecutorService scheduledExecutorService;
    private final SubscribeInfoManager subscribeInfoManager;
    private final RecoverManager recoverStorageManager;
    private final ConcurrentHashMap<String, SubscriberInfo> topicSubcriberRegistry = new ConcurrentHashMap();
    private FetchManager fetchManager;
    private final ConcurrentHashSet<String> publishedTopics = new ConcurrentHashSet();

    public SimpleMessageConsumer(MetaMessageSessionFactory messageSessionFactory, RemotingClientWrapper remotingClient, ConsumerConfig consumerConfig, ConsumerZooKeeper consumerZooKeeper, ProducerZooKeeper producerZooKeeper, SubscribeInfoManager subscribeInfoManager, RecoverManager recoverManager, OffsetStorage offsetStorage, LoadBalanceStrategy loadBalanceStrategy) {
        this.messageSessionFactory = messageSessionFactory;
        this.remotingClient = remotingClient;
        this.consumerConfig = consumerConfig;
        this.producerZooKeeper = producerZooKeeper;
        this.consumerZooKeeper = consumerZooKeeper;
        this.offsetStorage = offsetStorage;
        this.subscribeInfoManager = subscribeInfoManager;
        this.recoverStorageManager = recoverManager;
        this.fetchManager = new SimpleFetchManager(consumerConfig, this);
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.loadBalanceStrategy = loadBalanceStrategy;
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                SimpleMessageConsumer.this.consumerZooKeeper.commitOffsets(SimpleMessageConsumer.this.fetchManager);
            }
        }, consumerConfig.getCommitOffsetPeriodInMills(), consumerConfig.getCommitOffsetPeriodInMills(), TimeUnit.MILLISECONDS);
    }

    public MetaMessageSessionFactory getParent() {
        return this.messageSessionFactory;
    }

    FetchManager getFetchManager() {
        return this.fetchManager;
    }

    void setFetchManager(FetchManager fetchManager) {
        this.fetchManager = fetchManager;
    }

    ConcurrentHashMap<String, SubscriberInfo> getTopicSubcriberRegistry() {
        return this.topicSubcriberRegistry;
    }

    @Override
    public OffsetStorage getOffsetStorage() {
        return this.offsetStorage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void shutdown() throws MetaClientException {
        if (this.fetchManager.isShutdown()) {
            return;
        }
        try {
            this.fetchManager.stopFetchRunner();
            this.consumerZooKeeper.unRegisterConsumer(this.fetchManager);
            for (String topic : this.publishedTopics) {
                this.producerZooKeeper.unPublishTopic(topic, this);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally {
            this.scheduledExecutorService.shutdownNow();
            this.offsetStorage.close();
            this.subscribeInfoManager.removeGroup(this.consumerConfig.getGroup());
            this.messageSessionFactory.removeChild(this);
        }
    }

    @Override
    public MessageConsumer subscribe(String topic, int maxSize, MessageListener messageListener) throws MetaClientException {
        this.checkState();
        if (StringUtils.isBlank((String)topic)) {
            throw new IllegalArgumentException("Blank topic");
        }
        if (messageListener == null) {
            throw new IllegalArgumentException("Null messageListener");
        }
        this.subscribeInfoManager.subscribe(topic, this.consumerConfig.getGroup(), maxSize, messageListener);
        SubscriberInfo info = this.topicSubcriberRegistry.get(topic);
        if (info == null) {
            info = new SubscriberInfo(messageListener, maxSize);
            SubscriberInfo oldInfo = this.topicSubcriberRegistry.putIfAbsent(topic, info);
            if (oldInfo != null) {
                throw new MetaClientException("Topic=" + topic + " has been subscribered");
            }
            return this;
        }
        throw new MetaClientException("Topic=" + topic + " has been subscribered");
    }

    @Override
    public void appendCouldNotProcessMessage(Message message) throws IOException {
        log.warn((Object)("Message could not process,save to local.MessageId=" + message.getId() + ",Topic=" + message.getTopic() + ",Partition=" + message.getPartition()));
        this.recoverStorageManager.append(this.consumerConfig.getGroup(), message);
    }

    private void checkState() {
        if (this.fetchManager.isShutdown()) {
            throw new IllegalStateException("Consumer has been shutdown");
        }
    }

    @Override
    public void completeSubscribe() throws MetaClientException {
        this.checkState();
        try {
            this.consumerZooKeeper.registerConsumer(this.consumerConfig, this.fetchManager, this.topicSubcriberRegistry, this.offsetStorage, this.loadBalanceStrategy);
        }
        catch (Exception e) {
            throw new MetaClientException("\u6ce8\u518c\u8ba2\u9605\u8005\u5931\u8d25", (Throwable)e);
        }
    }

    @Override
    public MessageListener getMessageListener(String topic) {
        SubscriberInfo info = this.topicSubcriberRegistry.get(topic);
        if (info == null) {
            return null;
        }
        return info.getMessageListener();
    }

    @Override
    public long offset(FetchRequest fetchRequest) throws MetaClientException {
        long start = System.currentTimeMillis();
        boolean success = false;
        try {
            long currentOffset = fetchRequest.getOffset();
            OffsetCommand offsetCmd = new OffsetCommand(fetchRequest.getTopic(), this.consumerConfig.getGroup(), fetchRequest.getPartition(), currentOffset, Integer.valueOf(OpaqueGenerator.getNextOpaque()));
            String serverUrl = fetchRequest.getBroker().getZKString();
            BooleanCommand booleanCmd = (BooleanCommand)this.remotingClient.invokeToGroup(serverUrl, (RequestCommand)offsetCmd, this.consumerConfig.getFetchTimeoutInMills(), TimeUnit.MILLISECONDS);
            switch (booleanCmd.getCode()) {
                case 200: {
                    success = true;
                    long l = Long.parseLong(booleanCmd.getErrorMsg());
                    return l;
                }
            }
            try {
                throw new MetaClientException(booleanCmd.getErrorMsg());
            }
            catch (MetaClientException e) {
                throw e;
            }
            catch (TimeoutException e) {
                throw new MetaOpeartionTimeoutException("Send message timeout in " + this.consumerConfig.getFetchTimeoutInMills() + " mills");
            }
            catch (Exception e) {
                throw new MetaClientException("get offset failed,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition() + ",current offset=" + fetchRequest.getOffset(), (Throwable)e);
            }
        }
        finally {
            long duration = System.currentTimeMillis() - start;
            if (duration > 200L) {
                MetaStatLog.addStatValue2(null, (String)"cli_offset_timeout", (String)fetchRequest.getTopic(), (long)duration);
            }
            if (!success) {
                MetaStatLog.addStat(null, (String)"cli_offset_failed", (String)fetchRequest.getTopic());
            }
        }
    }

    @Override
    public MessageIterator fetch(FetchRequest fetchRequest, long timeout, TimeUnit timeUnit) throws MetaClientException, InterruptedException {
        if (timeout <= 0L || timeUnit == null) {
            timeout = this.consumerConfig.getFetchTimeoutInMills();
            timeUnit = TimeUnit.MILLISECONDS;
        }
        long start = System.currentTimeMillis();
        boolean success = false;
        try {
            long currentOffset = fetchRequest.getOffset();
            GetCommand getCmd = new GetCommand(fetchRequest.getTopic(), this.consumerConfig.getGroup(), fetchRequest.getPartition(), currentOffset, fetchRequest.getMaxSize(), Integer.valueOf(OpaqueGenerator.getNextOpaque()));
            String serverUrl = fetchRequest.getBroker().getZKString();
            ResponseCommand response = this.remotingClient.invokeToGroup(serverUrl, (RequestCommand)getCmd, timeout, timeUnit);
            if (response instanceof DataCommand) {
                DataCommand dataCmd = (DataCommand)response;
                byte[] data = dataCmd.getData();
                if (data.length < fetchRequest.getMaxSize() / 2) {
                    fetchRequest.decreaseMaxSize();
                }
                success = true;
                MessageIterator messageIterator = new MessageIterator(fetchRequest.getTopic(), data);
                return messageIterator;
            }
            BooleanCommand booleanCmd = (BooleanCommand)response;
            switch (booleanCmd.getCode()) {
                case 404: {
                    success = true;
                    MessageIterator messageIterator = null;
                    return messageIterator;
                }
                case 403: {
                    success = true;
                    MessageIterator messageIterator = null;
                    return messageIterator;
                }
                case 301: {
                    success = true;
                    fetchRequest.resetRetries();
                    fetchRequest.setOffset(Long.parseLong(booleanCmd.getErrorMsg()), -1L, true);
                    MessageIterator messageIterator = null;
                    return messageIterator;
                }
            }
            try {
                throw new MetaClientException(((BooleanCommand)response).getErrorMsg());
            }
            catch (TimeoutException e) {
                throw new MetaOpeartionTimeoutException("Send message timeout in " + this.consumerConfig.getFetchTimeoutInMills() + " mills");
            }
            catch (MetaClientException e) {
                throw e;
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Exception e) {
                throw new MetaClientException("get message failed,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition() + ",offset=" + fetchRequest.getOffset(), (Throwable)e);
            }
        }
        finally {
            long duration = System.currentTimeMillis() - start;
            if (duration > 200L) {
                MetaStatLog.addStatValue2(null, (String)"cli_get_timeout", (String)fetchRequest.getTopic(), (long)duration);
            }
            if (!success) {
                MetaStatLog.addStat(null, (String)"cli_get_failed", (String)fetchRequest.getTopic());
            }
        }
    }

    @Override
    public void setSubscriptions(Collection<Subscription> subscriptions) throws MetaClientException {
        if (subscriptions == null) {
            return;
        }
        for (Subscription subscription : subscriptions) {
            this.subscribe(subscription.getTopic(), subscription.getMaxSize(), subscription.getMessageListener());
        }
    }

    @Override
    public MessageIterator get(String topic, Partition partition, long offset, int maxSize, long timeout, TimeUnit timeUnit) throws MetaClientException, InterruptedException {
        if (!this.publishedTopics.contains((Object)topic)) {
            this.producerZooKeeper.publishTopic(topic, this);
            this.publishedTopics.add((Object)topic);
        }
        Broker broker = new Broker(partition.getBrokerId(), this.producerZooKeeper.selectBroker(topic, partition));
        TopicPartitionRegInfo topicPartitionRegInfo = new TopicPartitionRegInfo(topic, partition, offset);
        return this.fetch(new FetchRequest(broker, 0L, topicPartitionRegInfo, maxSize), timeout, timeUnit);
    }

    @Override
    public ConsumerConfig getConsumerConfig() {
        return this.consumerConfig;
    }

    @Override
    public MessageIterator get(String topic, Partition partition, long offset, int maxSize) throws MetaClientException, InterruptedException {
        return this.get(topic, partition, offset, maxSize, 10000L, TimeUnit.MILLISECONDS);
    }
}

