package com.taobao.metamorphosis.client.extension.producer;

import com.taobao.common.store.Store;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager;
import com.taobao.metamorphosis.client.extension.storage.MessageStore;
import com.taobao.metamorphosis.cluster.Partition;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

/* loaded from: input_file:com/taobao/metamorphosis/client/extension/producer/OrderedLocalMessageStorageManager.class */
public class OrderedLocalMessageStorageManager extends LocalMessageStorageManager {
    public OrderedLocalMessageStorageManager(MetaClientConfig metaClientConfig) {
        super(metaClientConfig);
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.LocalMessageStorageManager, com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager
    public void recover() {
        Set<String> keySet = this.topicStoreMap.keySet();
        if (keySet == null || keySet.size() == 0) {
            log.info("SendRecover没有需要恢复的消息");
            return;
        }
        if (this.messageRecoverer == null) {
            log.warn("messageRecoverer还未设置");
            return;
        }
        Iterator<String> it = keySet.iterator();
        while (it.hasNext()) {
            String[] split = it.next().split("@");
            String str = split[0];
            Partition partition = new Partition(split[1]);
            if (!partition.equals(Partition.RandomPartiton) && getMessageCount(str, partition) > 0) {
                recover(str, partition, this.messageRecoverer);
            }
        }
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.LocalMessageStorageManager, com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager
    public boolean recover(final String str, final Partition partition, final MessageRecoverManager.MessageRecoverer messageRecoverer) {
        final String generateKey = generateKey(str, partition);
        FutureTask<Boolean> futureTask = new FutureTask<>(new Callable<Boolean>() { // from class: com.taobao.metamorphosis.client.extension.producer.OrderedLocalMessageStorageManager.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                try {
                    try {
                        Store orCreateStore = OrderedLocalMessageStorageManager.this.getOrCreateStore(str, Partition.RandomPartiton);
                        if (orCreateStore.size() > 0) {
                            synchronized (orCreateStore) {
                                if (orCreateStore.size() > 0) {
                                    LocalMessageStorageManager.log.info("SendRecover topic=" + str + "@-1--1恢复消息" + innerRecover(orCreateStore, messageRecoverer) + "条");
                                }
                            }
                        }
                        LocalMessageStorageManager.log.info("SendRecover topic=" + generateKey + "恢复消息" + innerRecover(OrderedLocalMessageStorageManager.this.getOrCreateStore(str, partition), messageRecoverer) + "条");
                        LocalMessageStorageManager.log.info("SendRecover执行完毕移除发送恢复任务,topic=" + generateKey);
                        OrderedLocalMessageStorageManager.this.topicRecoverTaskMap.remove(generateKey);
                    } catch (Throwable th) {
                        LocalMessageStorageManager.log.error("SendRecover发送消息恢复失败,topic=" + generateKey, th);
                        LocalMessageStorageManager.log.info("SendRecover执行完毕移除发送恢复任务,topic=" + generateKey);
                        OrderedLocalMessageStorageManager.this.topicRecoverTaskMap.remove(generateKey);
                    }
                    return true;
                } catch (Throwable th2) {
                    LocalMessageStorageManager.log.info("SendRecover执行完毕移除发送恢复任务,topic=" + generateKey);
                    OrderedLocalMessageStorageManager.this.topicRecoverTaskMap.remove(generateKey);
                    throw th2;
                }
            }

            private int innerRecover(Store store, MessageRecoverManager.MessageRecoverer messageRecoverer2) throws IOException, Exception {
                Iterator it = store.iterator();
                int i = 0;
                while (it.hasNext()) {
                    byte[] bArr = (byte[]) it.next();
                    messageRecoverer2.handle((Message) OrderedLocalMessageStorageManager.this.deserializer.decodeObject(store.get(bArr)));
                    try {
                        store.remove(bArr);
                        i++;
                    } catch (IOException e) {
                        LocalMessageStorageManager.log.error("SendRecover remove message failed", e);
                    }
                }
                return i;
            }
        });
        if (this.topicRecoverTaskMap.putIfAbsent(generateKey, futureTask) == null) {
            this.threadPoolExecutor.submit(futureTask);
            return true;
        }
        if (!log.isDebugEnabled()) {
            return false;
        }
        log.debug("SendRecover发送恢复任务正在运行,不需要重新启动,topic=" + str);
        return false;
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.LocalMessageStorageManager
    protected Store newStore(String str) throws IOException {
        return new MessageStore(this.META_LOCALMESSAGE_PATH + File.separator + str, str);
    }
}
