/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.producer;

import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.ZkClientChangedListener;
import com.taobao.metamorphosis.client.producer.PartitionSelector;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.MetaZookeeper;
import com.taobao.metamorphosis.utils.ZkUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ProducerZooKeeper
implements ZkClientChangedListener {
    private final RemotingClientWrapper remotingClient;
    private final ConcurrentHashMap<String, FutureTask<BrokerConnectionListener>> topicConnectionListeners = new ConcurrentHashMap();
    private final MetaClientConfig metaClientConfig;
    private ZkClient zkClient;
    private final MetaZookeeper metaZookeeper;
    private String defaultTopic;
    static final Log log = LogFactory.getLog(ProducerZooKeeper.class);

    public ProducerZooKeeper(MetaZookeeper metaZookeeper, RemotingClientWrapper remotingClient, ZkClient zkClient, MetaClientConfig metaClientConfig) {
        this.metaZookeeper = metaZookeeper;
        this.remotingClient = remotingClient;
        this.zkClient = zkClient;
        this.metaClientConfig = metaClientConfig;
    }

    public void publishTopic(final String topic, final Object ref) {
        if (this.topicConnectionListeners.get(topic) != null) {
            this.addRef(topic, ref);
            return;
        }
        FutureTask<BrokerConnectionListener> task = new FutureTask<BrokerConnectionListener>(new Callable<BrokerConnectionListener>(){

            @Override
            public BrokerConnectionListener call() throws Exception {
                BrokerConnectionListener listener = new BrokerConnectionListener(topic);
                if (ProducerZooKeeper.this.zkClient != null) {
                    ProducerZooKeeper.this.publishTopicInternal(topic, listener);
                }
                listener.references.add(ref);
                return listener;
            }
        });
        FutureTask<BrokerConnectionListener> existsTask = this.topicConnectionListeners.putIfAbsent(topic, task);
        if (existsTask == null) {
            task.run();
        } else {
            this.addRef(topic, ref);
        }
    }

    private void addRef(String topic, Object ref) {
        BrokerConnectionListener listener = this.getBrokerConnectionListener(topic);
        if (!listener.references.contains(ref)) {
            listener.references.add(ref);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unPublishTopic(String topic, Object ref) {
        BrokerConnectionListener listener = this.getBrokerConnectionListener(topic);
        if (listener != null) {
            Set<Object> set = listener.references;
            synchronized (set) {
                if (this.getBrokerConnectionListener(topic) == null) {
                    return;
                }
                listener.references.remove(ref);
                if (listener.references.isEmpty()) {
                    this.topicConnectionListeners.remove(topic);
                    listener.dispose();
                }
            }
        }
    }

    private void publishTopicInternal(String topic, BrokerConnectionListener listener) throws Exception, NotifyRemotingException, InterruptedException {
        String partitionPath = this.metaZookeeper.brokerTopicsPubPath + "/" + topic;
        ZkUtils.makeSurePersistentPathExists((ZkClient)this.zkClient, (String)partitionPath);
        this.zkClient.subscribeChildChanges(partitionPath, (IZkChildListener)listener);
        listener.syncedUpdateBrokersInfo();
    }

    BrokerConnectionListener getBrokerConnectionListener(String topic) {
        FutureTask<BrokerConnectionListener> task = this.topicConnectionListeners.get(topic);
        if (task != null) {
            try {
                return task.get();
            }
            catch (Exception e) {
                log.error((Object)"\u83b7\u53d6BrokerConnectionListener\u5931\u8d25", (Throwable)e);
                return null;
            }
        }
        return null;
    }

    Set<String> getServerUrlSetByTopic(String topic) {
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
        if (brokerConnectionListener != null) {
            BrokersInfo info = brokerConnectionListener.brokersInfo;
            Map<Integer, String> brokerStringMap = info.oldBrokerStringMap;
            Map<String, List<Partition>> topicPartitionMap = info.oldTopicPartitionMap;
            List<Partition> plist = topicPartitionMap.get(topic);
            if (plist != null) {
                HashSet<String> result = new HashSet<String>();
                for (Partition partition : plist) {
                    int brokerId = partition.getBrokerId();
                    String url = brokerStringMap.get(brokerId);
                    if (url == null) continue;
                    result.add(url);
                }
                return result;
            }
        }
        return Collections.emptySet();
    }

    public synchronized void setDefaultTopic(String topic, Object ref) {
        if (this.defaultTopic != null && !this.defaultTopic.equals(topic)) {
            throw new IllegalStateException("Default topic has been setup already:" + this.defaultTopic);
        }
        this.defaultTopic = topic;
        this.publishTopic(topic, ref);
    }

    Partition selectPartition(String topic, Message msg, PartitionSelector selector, String serverUrl) throws MetaClientException {
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
        if (brokerConnectionListener != null) {
            BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
            List<Partition> partitions = brokersInfo.oldTopicPartitionMap.get(topic);
            Map<Integer, String> brokerStringMap = brokersInfo.oldBrokerStringMap;
            ArrayList<Partition> partitionsForSelect = new ArrayList<Partition>();
            for (Partition partition : partitions) {
                if (!serverUrl.equals(brokerStringMap.get(partition.getBrokerId()))) continue;
                partitionsForSelect.add(partition);
            }
            return selector.getPartition(topic, partitionsForSelect, msg);
        }
        return this.selectDefaultPartition(topic, msg, selector, serverUrl);
    }

    public String selectBroker(String topic, Partition partition) {
        if (this.metaClientConfig.getServerUrl() != null) {
            return this.metaClientConfig.getServerUrl();
        }
        if (partition != null) {
            BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
            if (brokerConnectionListener != null) {
                BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
                return brokersInfo.oldBrokerStringMap.get(partition.getBrokerId());
            }
            return this.selectDefaultBroker(topic, partition);
        }
        return null;
    }

    private String selectDefaultBroker(String topic, Partition partition) {
        if (this.defaultTopic == null) {
            return null;
        }
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(this.defaultTopic);
        if (brokerConnectionListener != null) {
            BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
            return brokersInfo.oldBrokerStringMap.get(partition.getBrokerId());
        }
        return null;
    }

    public Partition selectPartition(String topic, Message message, PartitionSelector partitionSelector) throws MetaClientException {
        if (this.metaClientConfig.getServerUrl() != null) {
            return Partition.RandomPartiton;
        }
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
        if (brokerConnectionListener != null) {
            BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
            return partitionSelector.getPartition(topic, brokersInfo.oldTopicPartitionMap.get(topic), message);
        }
        return this.selectDefaultPartition(topic, message, partitionSelector, null);
    }

    private Partition selectDefaultPartition(String topic, Message message, PartitionSelector partitionSelector, String serverUrl) throws MetaClientException {
        if (this.defaultTopic == null) {
            return null;
        }
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(this.defaultTopic);
        if (brokerConnectionListener != null) {
            BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
            if (serverUrl == null) {
                return partitionSelector.getPartition(this.defaultTopic, brokersInfo.oldTopicPartitionMap.get(this.defaultTopic), message);
            }
            List<Partition> partitions = brokersInfo.oldTopicPartitionMap.get(this.defaultTopic);
            Map<Integer, String> brokerStringMap = brokersInfo.oldBrokerStringMap;
            ArrayList<Partition> partitionsForSelect = new ArrayList<Partition>();
            for (Partition partition : partitions) {
                if (!serverUrl.equals(brokerStringMap.get(partition.getBrokerId()))) continue;
                partitionsForSelect.add(partition);
            }
            return partitionSelector.getPartition(this.defaultTopic, partitionsForSelect, message);
        }
        return null;
    }

    @Override
    public void onZkClientChanged(ZkClient newClient) {
        this.zkClient = newClient;
        try {
            for (String topic : this.topicConnectionListeners.keySet()) {
                log.info((Object)("re-publish topic to zk,topic=" + topic));
                this.publishTopicInternal(topic, this.getBrokerConnectionListener(topic));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            log.error((Object)"\u91cd\u65b0\u8bbe\u7f6ezKClient\u5931\u8d25", (Throwable)e);
        }
    }

    final class BrokerConnectionListener
    implements IZkChildListener {
        final Lock lock = new ReentrantLock();
        volatile BrokersInfo brokersInfo = new BrokersInfo(new TreeMap<Integer, String>(), new HashMap<String, List<Partition>>());
        final String topic;
        final Set<Object> references = Collections.synchronizedSet(new HashSet());

        public BrokerConnectionListener(String topic) {
            this.topic = topic;
        }

        void dispose() {
            String partitionPath = ((ProducerZooKeeper)ProducerZooKeeper.this).metaZookeeper.brokerTopicsPubPath + "/" + this.topic;
            ProducerZooKeeper.this.zkClient.unsubscribeChildChanges(partitionPath, (IZkChildListener)this);
        }

        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            this.syncedUpdateBrokersInfo();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void syncedUpdateBrokersInfo() throws NotifyRemotingException, InterruptedException {
            this.lock.lock();
            try {
                Map newBrokerStringMap = ProducerZooKeeper.this.metaZookeeper.getMasterBrokersByTopic(this.topic);
                ArrayList<String> topics = new ArrayList<String>(1);
                topics.add(this.topic);
                Map newTopicPartitionMap = ProducerZooKeeper.this.metaZookeeper.getPartitionsForTopicsFromMaster(topics);
                log.warn((Object)("Begin receiving broker changes for topic " + this.topic + ",broker ids:" + newTopicPartitionMap));
                for (Map.Entry<Integer, String> entry : this.brokersInfo.oldBrokerStringMap.entrySet()) {
                    Integer oldBrokerId = entry.getKey();
                    String oldBrokerString = entry.getValue();
                    String newBrokerString = (String)newBrokerStringMap.get(oldBrokerId);
                    if (newBrokerStringMap.containsKey(oldBrokerId)) {
                        if (newBrokerString.equals(oldBrokerString)) continue;
                        ProducerZooKeeper.this.remotingClient.close(oldBrokerString, false);
                        ProducerZooKeeper.this.remotingClient.connect(newBrokerString);
                        ProducerZooKeeper.this.remotingClient.awaitReadyInterrupt(newBrokerString);
                        log.warn((Object)("Close " + oldBrokerString + ",connect to " + newBrokerString));
                        continue;
                    }
                    ProducerZooKeeper.this.remotingClient.close(oldBrokerString, false);
                    log.warn((Object)("Close " + oldBrokerString));
                }
                for (Map.Entry<Integer, String> entry : newBrokerStringMap.entrySet()) {
                    Integer newBrokerId = entry.getKey();
                    String newBrokerString = entry.getValue();
                    if (this.brokersInfo.oldBrokerStringMap.containsKey(newBrokerId)) continue;
                    ProducerZooKeeper.this.remotingClient.connect(newBrokerString);
                    ProducerZooKeeper.this.remotingClient.awaitReadyInterrupt(newBrokerString);
                    log.warn((Object)("Connect to " + newBrokerString));
                }
                this.brokersInfo = new BrokersInfo(newBrokerStringMap, newTopicPartitionMap);
                log.warn((Object)("End receiving broker changes for topic " + this.topic));
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public static class BrokersInfo {
        final Map<Integer, String> oldBrokerStringMap;
        final Map<String, List<Partition>> oldTopicPartitionMap;

        public BrokersInfo(Map<Integer, String> oldBrokerStringMap, Map<String, List<Partition>> oldTopicPartitionMap) {
            this.oldBrokerStringMap = oldBrokerStringMap;
            this.oldTopicPartitionMap = oldTopicPartitionMap;
        }
    }
}

