package com.alibaba.otter.canal.connector.rocketmq.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.core.util.MessageUtil;
import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQConstants;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SPI(RocketMQConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/rocketmq/consumer/CanalRocketMQConsumer.class */
public class CanalRocketMQConsumer implements CanalMsgConsumer {
    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQConsumer.class);
    private static final String CLOUD_ACCESS_CHANNEL = "cloud";
    private String nameServer;
    private String topic;
    private String groupName;
    private DefaultMQPushConsumer rocketMQConsumer;
    private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
    private boolean flatMessage;
    private String accessKey;
    private String secretKey;
    private String customizedTraceTopic;
    private String accessChannel;
    private String namespace;
    private int batchSize = -1;
    private long batchProcessTimeout = 60000;
    private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;
    private boolean enableMessageTrace = false;
    private String filter = "*";

    public void init(Properties properties, String str, String str2) {
        this.topic = str;
        this.groupName = str2;
        this.flatMessage = ((Boolean) properties.get("canal.mq.flatMessage")).booleanValue();
        this.messageBlockingQueue = new LinkedBlockingQueue(1024);
        this.accessKey = properties.getProperty("canal.aliyun.accessKey");
        this.secretKey = properties.getProperty("canal.aliyun.secretKey");
        String property = properties.getProperty(RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
        if (StringUtils.isNotEmpty(property)) {
            this.enableMessageTrace = Boolean.parseBoolean(property);
        }
        this.customizedTraceTopic = properties.getProperty(RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
        this.accessChannel = properties.getProperty(RocketMQConstants.ROCKETMQ_ACCESS_CHANNEL);
        this.namespace = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESPACE);
        this.nameServer = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
        String property2 = properties.getProperty(RocketMQConstants.ROCKETMQ_BATCH_SIZE);
        if (StringUtils.isNotEmpty(property2)) {
            this.batchSize = Integer.parseInt(property2);
        }
        String property3 = properties.getProperty(RocketMQConstants.ROCKETMQ_SUBSCRIBE_FILTER);
        if (StringUtils.isNotEmpty(property3)) {
            this.filter = property3;
        }
    }

    public void connect() {
        AclClientRPCHook aclClientRPCHook = null;
        if (null != this.accessKey && this.accessKey.length() > 0 && null != this.secretKey && this.secretKey.length() > 0) {
            SessionCredentials sessionCredentials = new SessionCredentials();
            sessionCredentials.setAccessKey(this.accessKey);
            sessionCredentials.setSecretKey(this.secretKey);
            aclClientRPCHook = new AclClientRPCHook(sessionCredentials);
        }
        this.rocketMQConsumer = new DefaultMQPushConsumer(this.groupName, aclClientRPCHook, new AllocateMessageQueueAveragely(), this.enableMessageTrace, this.customizedTraceTopic);
        this.rocketMQConsumer.setVipChannelEnabled(false);
        if (CLOUD_ACCESS_CHANNEL.equals(this.accessChannel)) {
            this.rocketMQConsumer.setAccessChannel(AccessChannel.CLOUD);
        }
        if (!StringUtils.isEmpty(this.namespace)) {
            this.rocketMQConsumer.setNamespace(this.namespace);
        }
        if (!StringUtils.isBlank(this.nameServer)) {
            this.rocketMQConsumer.setNamesrvAddr(this.nameServer);
        }
        if (this.batchSize != -1) {
            this.rocketMQConsumer.setConsumeMessageBatchMaxSize(this.batchSize);
        }
        try {
            if (this.rocketMQConsumer == null) {
                connect();
            }
            this.rocketMQConsumer.subscribe(this.topic, this.filter);
            this.rocketMQConsumer.registerMessageListener((list, consumeOrderlyContext) -> {
                consumeOrderlyContext.setAutoCommit(true);
                return process(list) ? ConsumeOrderlyStatus.SUCCESS : ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            });
            this.rocketMQConsumer.start();
        } catch (MQClientException e) {
            logger.error("Start RocketMQ consumer error", e);
        }
    }

    private boolean process(List<MessageExt> list) {
        if (logger.isDebugEnabled()) {
            logger.debug("Get Message: {}", list);
        }
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<MessageExt> it = list.iterator();
        while (it.hasNext()) {
            byte[] body = it.next().getBody();
            if (body != null) {
                try {
                    if (this.flatMessage) {
                        newArrayList.add((CommonMessage) JSON.parseObject(body, CommonMessage.class, new Feature[0]));
                    } else {
                        newArrayList.addAll(MessageUtil.convert(CanalMessageSerializerUtil.deserializer(body)));
                    }
                } catch (Exception e) {
                    logger.error("Add message error", e);
                    throw new CanalClientException(e);
                }
            } else {
                logger.warn("Received message data is null");
            }
        }
        ConsumerBatchMessage<CommonMessage> consumerBatchMessage = new ConsumerBatchMessage<>(newArrayList);
        try {
            this.messageBlockingQueue.put(consumerBatchMessage);
            try {
                return consumerBatchMessage.waitFinish(this.batchProcessTimeout) && consumerBatchMessage.isSuccess();
            } catch (InterruptedException e2) {
                logger.error("Interrupted when waiting messages to be finished.", e2);
                throw new RuntimeException(e2);
            }
        } catch (InterruptedException e3) {
            logger.error("Put message to queue error", e3);
            throw new RuntimeException(e3);
        }
    }

    public List<CommonMessage> getMessage(Long l, TimeUnit timeUnit) {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage<CommonMessage> poll = this.messageBlockingQueue.poll(l.longValue(), timeUnit);
            if (poll == null) {
                return null;
            }
            this.lastGetBatchMessage = poll;
            return poll.getData();
        } catch (InterruptedException e) {
            logger.warn("Get message timeout", e);
            throw new CanalClientException("Failed to fetch the data after: " + l);
        }
    }

    public void rollback() {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    public void ack() {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.ack();
            }
        } catch (Throwable th) {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    public void disconnect() {
        this.rocketMQConsumer.unsubscribe(this.topic);
        this.rocketMQConsumer.shutdown();
    }
}
