package com.taobao.metamorphosis.client.consumer;

import com.taobao.common.store.Store;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.extension.storage.MessageStore;
import com.taobao.metamorphosis.exception.GetRecoverStorageErrorException;
import com.taobao.metamorphosis.utils.NamedThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/consumer/RecoverStorageManager.class */
public class RecoverStorageManager extends AbstractRecoverManager {
    private static final String SPLIT = "@";
    static final Log log = LogFactory.getLog(RecoverStorageManager.class);
    public static final String META_RECOVER_STORE_PATH = System.getProperty("meta.recover.path", System.getProperty("user.home") + File.separator + ".meta_recover");
    private final SubscribeInfoManager subscribeInfoManager;
    private final ThreadPoolExecutor threadPoolExecutor;
    private boolean started;
    private final ConcurrentHashMap<String, FutureTask<Store>> topicStoreMap = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private final AtomicLong count = new AtomicLong(0);
    boolean wasFirst = true;

    public RecoverStorageManager(MetaClientConfig metaClientConfig, SubscribeInfoManager subscribeInfoManager) {
        this.threadPoolExecutor = new ThreadPoolExecutor(metaClientConfig.getRecoverThreadCount(), metaClientConfig.getRecoverThreadCount(), 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new NamedThreadFactory("Recover-thread"), new ThreadPoolExecutor.CallerRunsPolicy());
        makeDataDir();
        this.subscribeInfoManager = subscribeInfoManager;
        loadStores();
    }

    @Override // com.taobao.metamorphosis.client.consumer.RecoverManager
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override // com.taobao.metamorphosis.client.consumer.RecoverManager
    public synchronized void start(MetaClientConfig metaClientConfig) {
        if (this.started) {
            return;
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.taobao.metamorphosis.client.consumer.RecoverStorageManager.1
            @Override // java.lang.Runnable
            public void run() {
                RecoverStorageManager.this.recover();
            }
        }, 5000L, metaClientConfig.getRecoverMessageIntervalInMills(), TimeUnit.MILLISECONDS);
        this.started = true;
    }

    private void loadStores() {
        for (File file : new File(META_RECOVER_STORE_PATH).listFiles()) {
            if (file.isDirectory()) {
                String name = file.getName();
                String[] split = name.split(SPLIT);
                if (split.length == 2) {
                    log.info("加载recover storage " + name + " ...");
                    getOrCreateStore(split[0], split[1]);
                }
            }
        }
    }

    private void makeDataDir() {
        File file = new File(META_RECOVER_STORE_PATH);
        if (file.exists()) {
            return;
        }
        file.mkdir();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recover() {
        for (final Map.Entry<String, FutureTask<Store>> entry : this.topicStoreMap.entrySet()) {
            this.threadPoolExecutor.execute(new Runnable() { // from class: com.taobao.metamorphosis.client.consumer.RecoverStorageManager.2
                @Override // java.lang.Runnable
                public void run() {
                    String[] split = ((String) entry.getKey()).split(RecoverStorageManager.SPLIT);
                    String str = split[0];
                    String str2 = split[1];
                    Store<byte[]> store = RecoverStorageManager.this.getStore(str, (FutureTask) entry.getValue());
                    try {
                        MessageListener messageListener = RecoverStorageManager.this.subscribeInfoManager.getMessageListener(str, str2);
                        int i = 0;
                        for (byte[] bArr : store) {
                            Message message = (Message) RecoverStorageManager.this.deserializer.decodeObject(store.get(bArr));
                            if (message != null) {
                                RecoverStorageManager.this.receiveMessage(store, bArr, message, messageListener);
                            }
                            i++;
                        }
                        RecoverStorageManager.log.info("Recover topic=" + str + "恢复消息" + i + "条");
                    } catch (Exception e) {
                        RecoverStorageManager.log.error("Recover message failed,topic=" + str, e);
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveMessage(final Store store, final byte[] bArr, final Message message, final MessageListener messageListener) {
        if (messageListener == null) {
            if (this.wasFirst) {
                log.warn("messageListener为null,可能是消费者还未创建");
                this.wasFirst = false;
                return;
            }
            return;
        }
        if (messageListener.getExecutor() == null) {
            notifyListener(store, bArr, message, messageListener);
        } else {
            try {
                messageListener.getExecutor().execute(new Runnable() { // from class: com.taobao.metamorphosis.client.consumer.RecoverStorageManager.3
                    @Override // java.lang.Runnable
                    public void run() {
                        RecoverStorageManager.this.notifyListener(store, bArr, message, messageListener);
                    }
                });
            } catch (RejectedExecutionException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyListener(Store store, byte[] bArr, Message message, MessageListener messageListener) {
        messageListener.recieveMessages(message);
        try {
            store.remove(bArr);
        } catch (IOException e) {
            log.error("Remove message failed", e);
        }
    }

    @Override // com.taobao.metamorphosis.client.Shutdownable
    public void shutdown() {
        for (Map.Entry<String, FutureTask<Store>> entry : this.topicStoreMap.entrySet()) {
            try {
                getStore(entry.getKey().split(SPLIT)[0], entry.getValue()).close();
            } catch (IOException e) {
            }
        }
        this.threadPoolExecutor.shutdown();
        this.scheduledExecutorService.shutdown();
    }

    @Override // com.taobao.metamorphosis.client.consumer.RecoverManager
    public void append(String str, Message message) throws IOException {
        Store orCreateStore = getOrCreateStore(message.getTopic(), str);
        long id = message.getId();
        IOException iOException = null;
        for (int i = 0; i < 5; i++) {
            try {
                ByteBuffer allocate = ByteBuffer.allocate(16);
                allocate.putLong(id);
                orCreateStore.add(allocate.array(), this.serializer.encodeObject(message));
                return;
            } catch (IOException e) {
                if (!e.getMessage().contains("重复")) {
                    throw e;
                }
                iOException = e;
                log.warn("写入recover store出错,key=" + id + "," + e.getMessage() + ",retry...");
                id += this.count.incrementAndGet();
            }
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    public Store getOrCreateStore(String str, String str2) {
        final String generateKey = generateKey(str, str2);
        FutureTask<Store> futureTask = this.topicStoreMap.get(generateKey);
        if (futureTask != null) {
            return getStore(str, futureTask);
        }
        FutureTask<Store> futureTask2 = new FutureTask<>(new Callable<Store>() { // from class: com.taobao.metamorphosis.client.consumer.RecoverStorageManager.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Store call() throws Exception {
                File file = new File(RecoverStorageManager.META_RECOVER_STORE_PATH + File.separator + generateKey);
                if (!file.exists()) {
                    file.mkdir();
                }
                return new MessageStore(RecoverStorageManager.META_RECOVER_STORE_PATH + File.separator + generateKey, generateKey);
            }
        });
        FutureTask<Store> putIfAbsent = this.topicStoreMap.putIfAbsent(generateKey, futureTask2);
        if (putIfAbsent == null) {
            futureTask2.run();
            putIfAbsent = futureTask2;
        }
        return getStore(generateKey, putIfAbsent);
    }

    private String generateKey(String str, String str2) {
        return str + SPLIT + str2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Store getStore(String str, FutureTask<Store> futureTask) {
        try {
            return futureTask.get();
        } catch (Throwable th) {
            log.error("获取name=" + str + "对应的store失败", th);
            throw new GetRecoverStorageErrorException("获取topic=" + str + "对应的store失败", th);
        }
    }
}
