/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.consumer;

import com.taobao.common.store.Store;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.consumer.AbstractRecoverManager;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.client.consumer.SubscribeInfoManager;
import com.taobao.metamorphosis.client.extension.storage.MessageStore;
import com.taobao.metamorphosis.exception.GetRecoverStorageErrorException;
import com.taobao.metamorphosis.utils.NamedThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RecoverStorageManager
extends AbstractRecoverManager {
    private static final String SPLIT = "@";
    private final ConcurrentHashMap<String, FutureTask<Store>> topicStoreMap = new ConcurrentHashMap();
    static final Log log = LogFactory.getLog(RecoverStorageManager.class);
    public static final String META_RECOVER_STORE_PATH = System.getProperty("meta.recover.path", System.getProperty("user.home") + File.separator + ".meta_recover");
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private final SubscribeInfoManager subscribeInfoManager;
    private final ThreadPoolExecutor threadPoolExecutor;
    private final AtomicLong count = new AtomicLong(0L);
    private boolean started;
    boolean wasFirst = true;

    public RecoverStorageManager(MetaClientConfig metaClientConfig, SubscribeInfoManager subscribeInfoManager) {
        this.threadPoolExecutor = new ThreadPoolExecutor(metaClientConfig.getRecoverThreadCount(), metaClientConfig.getRecoverThreadCount(), 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), (ThreadFactory)new NamedThreadFactory("Recover-thread"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.makeDataDir();
        this.subscribeInfoManager = subscribeInfoManager;
        this.loadStores();
    }

    @Override
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override
    public synchronized void start(MetaClientConfig metaClientConfig) {
        if (this.started) {
            return;
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                RecoverStorageManager.this.recover();
            }
        }, 5000L, metaClientConfig.getRecoverMessageIntervalInMills(), TimeUnit.MILLISECONDS);
        this.started = true;
    }

    private void loadStores() {
        File[] files;
        File dataPath = new File(META_RECOVER_STORE_PATH);
        for (File subFile : files = dataPath.listFiles()) {
            String name;
            String[] tmps;
            if (!subFile.isDirectory() || (tmps = (name = subFile.getName()).split(SPLIT)).length != 2) continue;
            log.info((Object)("\u52a0\u8f7drecover storage " + name + " ..."));
            this.getOrCreateStore(tmps[0], tmps[1]);
        }
    }

    private void makeDataDir() {
        File file = new File(META_RECOVER_STORE_PATH);
        if (!file.exists()) {
            file.mkdir();
        }
    }

    private void recover() {
        for (final Map.Entry<String, FutureTask<Store>> entry : this.topicStoreMap.entrySet()) {
            this.threadPoolExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    String name = (String)entry.getKey();
                    String[] tmps = name.split(RecoverStorageManager.SPLIT);
                    String topic = tmps[0];
                    String group = tmps[1];
                    FutureTask task = (FutureTask)entry.getValue();
                    Store store = RecoverStorageManager.this.getStore(topic, task);
                    try {
                        MessageListener listener = RecoverStorageManager.this.subscribeInfoManager.getMessageListener(topic, group);
                        Iterator it = store.iterator();
                        int count = 0;
                        while (it.hasNext()) {
                            byte[] key = (byte[])it.next();
                            Message msg = (Message)RecoverStorageManager.this.deserializer.decodeObject(store.get(key));
                            if (msg != null) {
                                RecoverStorageManager.this.receiveMessage(store, key, msg, listener);
                            }
                            ++count;
                        }
                        log.info((Object)("Recover topic=" + topic + "\u6062\u590d\u6d88\u606f" + count + "\u6761"));
                    }
                    catch (Exception e) {
                        log.error((Object)("Recover message failed,topic=" + topic), (Throwable)e);
                    }
                }
            });
        }
    }

    private void receiveMessage(final Store store, final byte[] key, final Message msg, final MessageListener messageListener) {
        if (messageListener == null) {
            if (this.wasFirst) {
                log.warn((Object)"messageListener\u4e3anull,\u53ef\u80fd\u662f\u6d88\u8d39\u8005\u8fd8\u672a\u521b\u5efa");
                this.wasFirst = false;
            }
            return;
        }
        if (messageListener.getExecutor() != null) {
            try {
                messageListener.getExecutor().execute(new Runnable(){

                    @Override
                    public void run() {
                        RecoverStorageManager.this.notifyListener(store, key, msg, messageListener);
                    }
                });
            }
            catch (RejectedExecutionException e) {}
        } else {
            this.notifyListener(store, key, msg, messageListener);
        }
    }

    private void notifyListener(Store store, byte[] key, Message msg, MessageListener messageListener) {
        messageListener.recieveMessages(msg);
        try {
            store.remove(key);
        }
        catch (IOException e) {
            log.error((Object)"Remove message failed", (Throwable)e);
        }
    }

    @Override
    public void shutdown() {
        for (Map.Entry<String, FutureTask<Store>> entry : this.topicStoreMap.entrySet()) {
            String name = entry.getKey();
            String[] tmps = name.split(SPLIT);
            String topic = tmps[0];
            FutureTask<Store> task = entry.getValue();
            Store store = this.getStore(topic, task);
            try {
                store.close();
            }
            catch (IOException e) {}
        }
        this.threadPoolExecutor.shutdown();
        this.scheduledExecutorService.shutdown();
    }

    @Override
    public void append(String group, Message message) throws IOException {
        Store store = this.getOrCreateStore(message.getTopic(), group);
        long key = message.getId();
        IOException error = null;
        for (int i = 0; i < 5; ++i) {
            try {
                ByteBuffer buf = ByteBuffer.allocate(16);
                buf.putLong(key);
                store.add(buf.array(), this.serializer.encodeObject((Object)message));
                return;
            }
            catch (IOException e) {
                String msg = e.getMessage();
                if (msg.contains("\u91cd\u590d")) {
                    error = e;
                    log.warn((Object)("\u5199\u5165recover store\u51fa\u9519,key=" + key + "," + e.getMessage() + ",retry..."));
                    key += this.count.incrementAndGet();
                    continue;
                }
                throw e;
            }
        }
        if (error != null) {
            throw error;
        }
    }

    public Store getOrCreateStore(String topic, String group) {
        final String name = this.generateKey(topic, group);
        FutureTask<Store> task = this.topicStoreMap.get(name);
        if (task != null) {
            return this.getStore(topic, task);
        }
        task = new FutureTask<Store>(new Callable<Store>(){

            @Override
            public Store call() throws Exception {
                File file = new File(META_RECOVER_STORE_PATH + File.separator + name);
                if (!file.exists()) {
                    file.mkdir();
                }
                return new MessageStore(META_RECOVER_STORE_PATH + File.separator + name, name);
            }
        });
        FutureTask<Store> existsTask = this.topicStoreMap.putIfAbsent(name, task);
        if (existsTask == null) {
            task.run();
            existsTask = task;
        }
        return this.getStore(name, existsTask);
    }

    private String generateKey(String topic, String group) {
        return topic + SPLIT + group;
    }

    private Store getStore(String name, FutureTask<Store> task) {
        try {
            return task.get();
        }
        catch (Throwable t) {
            log.error((Object)("\u83b7\u53d6name=" + name + "\u5bf9\u5e94\u7684store\u5931\u8d25"), t);
            throw new GetRecoverStorageErrorException("\u83b7\u53d6topic=" + name + "\u5bf9\u5e94\u7684store\u5931\u8d25", t);
        }
    }
}

