/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.extension.producer;

import com.taobao.common.store.Store;
import com.taobao.common.store.journal.JournalStore;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.GetRecoverStorageErrorException;
import com.taobao.metamorphosis.exception.UnknowCodecTypeException;
import com.taobao.metamorphosis.utils.IdWorker;
import com.taobao.metamorphosis.utils.NamedThreadFactory;
import com.taobao.metamorphosis.utils.codec.Deserializer;
import com.taobao.metamorphosis.utils.codec.Serializer;
import com.taobao.metamorphosis.utils.codec.impl.Hessian1Deserializer;
import com.taobao.metamorphosis.utils.codec.impl.Hessian1Serializer;
import com.taobao.metamorphosis.utils.codec.impl.JavaDeserializer;
import com.taobao.metamorphosis.utils.codec.impl.JavaSerializer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LocalMessageStorageManager
implements MessageRecoverManager {
    protected static final String SPLIT = "@";
    protected final ConcurrentHashMap<String, FutureTask<Store>> topicStoreMap = new ConcurrentHashMap();
    protected final ConcurrentHashMap<String, FutureTask<Boolean>> topicRecoverTaskMap = new ConcurrentHashMap();
    private final Serializer serializer;
    protected final Deserializer deserializer;
    static final Log log = LogFactory.getLog(LocalMessageStorageManager.class);
    private final IdWorker idWorker = new IdWorker(0L);
    public static final String DEFAULT_META_LOCALMESSAGE_PATH = System.getProperty("meta.localmessage.path", System.getProperty("user.home") + File.separator + ".meta_localmessage");
    public String META_LOCALMESSAGE_PATH;
    private final String META_LOCALMESSAGE_CODEC_TYPE = System.getProperty("meta.localmessage.codec", "java");
    protected final ThreadPoolExecutor threadPoolExecutor;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    protected MessageRecoverManager.MessageRecoverer messageRecoverer;

    public LocalMessageStorageManager(MetaClientConfig metaClientConfig) {
        this(metaClientConfig, DEFAULT_META_LOCALMESSAGE_PATH, null);
    }

    public LocalMessageStorageManager(MetaClientConfig metaClientConfig, String path, MessageRecoverManager.MessageRecoverer messageRecoverer) {
        this.META_LOCALMESSAGE_PATH = StringUtils.isNotBlank((String)path) ? path : DEFAULT_META_LOCALMESSAGE_PATH;
        this.messageRecoverer = messageRecoverer;
        if (this.META_LOCALMESSAGE_CODEC_TYPE.equals("java")) {
            this.serializer = new JavaSerializer();
            this.deserializer = new JavaDeserializer();
        } else if (this.META_LOCALMESSAGE_CODEC_TYPE.equals("hessian1")) {
            this.serializer = new Hessian1Serializer();
            this.deserializer = new Hessian1Deserializer();
        } else {
            throw new UnknowCodecTypeException(this.META_LOCALMESSAGE_CODEC_TYPE);
        }
        this.threadPoolExecutor = new ThreadPoolExecutor(metaClientConfig.getRecoverThreadCount(), metaClientConfig.getRecoverThreadCount(), 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), (ThreadFactory)new NamedThreadFactory("SendRecover-thread"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.makeDataDir();
        this.loadStores();
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                log.info((Object)"\u5f00\u59cb\u5c1d\u8bd5\u53d1\u9001\u672c\u5730\u7f13\u5b58\u7684\u6d88\u606f...");
                LocalMessageStorageManager.this.recover();
            }
        }, 0L, metaClientConfig.getRecoverMessageIntervalInMills(), TimeUnit.MILLISECONDS);
    }

    private void loadStores() {
        File[] files;
        File dataPath = new File(this.META_LOCALMESSAGE_PATH);
        for (File subFile : files = dataPath.listFiles()) {
            String name;
            String[] tmps;
            if (!subFile.isDirectory() || (tmps = (name = subFile.getName()).split(SPLIT)).length != 2) continue;
            log.info((Object)("\u52a0\u8f7dlocal message storage " + name + " ..."));
            this.getOrCreateStore(tmps[0], new Partition(tmps[1]));
        }
    }

    @Override
    public void recover() {
        Set names = this.topicStoreMap.keySet();
        if (names == null || names.size() == 0) {
            log.info((Object)"SendRecover\u6ca1\u6709\u9700\u8981\u6062\u590d\u7684\u6d88\u606f");
            return;
        }
        if (this.messageRecoverer != null) {
            for (String name : names) {
                String[] tmps = name.split(SPLIT);
                String topic = tmps[0];
                Partition partition = new Partition(tmps[1]);
                int count = this.getMessageCount(topic, partition);
                log.info((Object)(name + "\u9700\u8981\u6062\u590d\u7684\u6761\u6570:" + count));
                if (count <= 0 || this.recover(topic, partition, this.messageRecoverer)) continue;
                log.info((Object)("SendRecover\u53d1\u9001\u6062\u590d\u4efb\u52a1\u6b63\u5728\u8fd0\u884c,\u4e0d\u9700\u8981\u91cd\u65b0\u542f\u52a8,name=" + name));
            }
        } else {
            log.warn((Object)"messageRecoverer\u8fd8\u672a\u8bbe\u7f6e");
        }
    }

    @Override
    public boolean recover(final String topic, final Partition partition, final MessageRecoverManager.MessageRecoverer recoverer) {
        FutureTask<Boolean> recoverTask;
        final String name = this.generateKey(topic, partition);
        FutureTask<Boolean> ret = this.topicRecoverTaskMap.putIfAbsent(name, recoverTask = new FutureTask<Boolean>(new Callable<Boolean>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Boolean call() throws Exception {
                AtomicLong count = new AtomicLong(0L);
                try {
                    Store store = LocalMessageStorageManager.this.getOrCreateStore(topic, partition);
                    this.innerRecover(store, recoverer, count, name);
                }
                catch (Throwable e) {
                    log.error((Object)("SendRecover\u53d1\u9001\u6d88\u606f\u6062\u590d\u5931\u8d25,name=" + name), e);
                }
                finally {
                    log.info((Object)("SendRecover\u6267\u884c\u5b8c\u6bd5\u79fb\u9664\u53d1\u9001\u6062\u590d\u4efb\u52a1,name=" + name + ",\u6062\u590d\u6d88\u606f" + count.get() + "\u6761"));
                    LocalMessageStorageManager.this.topicRecoverTaskMap.remove(name);
                }
                return true;
            }

            private void innerRecover(Store store, MessageRecoverManager.MessageRecoverer recoverer2, AtomicLong count, String name2) throws IOException, Exception {
                for (byte[] key : store) {
                    Message msg = (Message)LocalMessageStorageManager.this.deserializer.decodeObject(store.get(key));
                    recoverer2.handle(msg);
                    try {
                        store.remove(key);
                        count.incrementAndGet();
                        if (count.get() % 20000L != 0L) continue;
                        log.info((Object)("SendRecover " + name2 + "\u5df2\u6062\u590d\u6d88\u606f\u6761\u6570:" + count.get()));
                    }
                    catch (IOException e) {
                        log.error((Object)"SendRecover remove message failed", (Throwable)e);
                    }
                }
            }
        }));
        if (ret == null) {
            this.threadPoolExecutor.submit(recoverTask);
            return true;
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("SendRecover\u53d1\u9001\u6062\u590d\u4efb\u52a1\u6b63\u5728\u8fd0\u884c,\u4e0d\u9700\u8981\u91cd\u65b0\u542f\u52a8,name=" + name));
        }
        return false;
    }

    protected Store getOrCreateStore(String topic, Partition partition) {
        this.getOrCreateStore0(topic, Partition.RandomPartiton);
        return this.getOrCreateStore0(topic, partition);
    }

    private Store getOrCreateStore0(String topic, Partition partition) {
        final String name = this.generateKey(topic, partition);
        FutureTask<Store> task = this.topicStoreMap.get(name);
        if (task != null) {
            return this.getStore(name, task);
        }
        task = new FutureTask<Store>(new Callable<Store>(){

            @Override
            public Store call() throws Exception {
                File file = new File(LocalMessageStorageManager.this.META_LOCALMESSAGE_PATH + File.separator + name);
                if (!file.exists()) {
                    file.mkdir();
                }
                return this.newStore(name);
            }

            private Store newStore(String name2) throws IOException {
                return LocalMessageStorageManager.this.newStore(name2);
            }
        });
        FutureTask<Store> existsTask = this.topicStoreMap.putIfAbsent(name, task);
        if (existsTask == null) {
            task.run();
            existsTask = task;
        }
        return this.getStore(name, existsTask);
    }

    private Store getStore(String topic, FutureTask<Store> task) {
        try {
            return task.get();
        }
        catch (Throwable t) {
            log.error((Object)("\u83b7\u53d6topic=" + topic + "\u5bf9\u5e94\u7684store\u5931\u8d25"), t);
            throw new GetRecoverStorageErrorException("\u83b7\u53d6topic=" + topic + "\u5bf9\u5e94\u7684store\u5931\u8d25", t);
        }
    }

    private void makeDataDir() {
        File file = new File(this.META_LOCALMESSAGE_PATH);
        if (!file.exists()) {
            file.mkdir();
        }
    }

    @Override
    public void shutdown() {
        for (Map.Entry<String, FutureTask<Store>> entry : this.topicStoreMap.entrySet()) {
            String name = entry.getKey();
            FutureTask<Store> task = entry.getValue();
            Store store = this.getStore(name, task);
            try {
                store.close();
            }
            catch (IOException e) {}
        }
    }

    @Override
    public void append(Message message, Partition partition) throws IOException {
        Store store = this.getOrCreateStore(message.getTopic(), partition);
        ByteBuffer buf = ByteBuffer.allocate(16);
        buf.putLong(this.idWorker.nextId());
        store.add(buf.array(), this.serializer.encodeObject((Object)message));
    }

    @Override
    public int getMessageCount(String topic, Partition partition) {
        String name = this.generateKey(topic, partition);
        FutureTask<Store> task = this.topicStoreMap.get(name);
        if (task != null) {
            return this.getStore(name, task).size();
        }
        return 0;
    }

    protected String generateKey(String topic, Partition partition) {
        return topic + SPLIT + partition;
    }

    @Override
    public synchronized void setMessageRecoverer(MessageRecoverManager.MessageRecoverer recoverer) {
        if (this.messageRecoverer == null) {
            this.messageRecoverer = recoverer;
        }
    }

    protected Store newStore(String name) throws IOException {
        return new JournalStore(this.META_LOCALMESSAGE_PATH + File.separator + name, name);
    }
}

