package com.alibaba.otter.canal.connector.rocketmq.producer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.Callback;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQConstants;
import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQProducerConfig;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SPI(RocketMQConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.class */
public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
    private DefaultMQProducer defaultMQProducer;
    private static final String CLOUD_ACCESS_CHANNEL = "cloud";
    protected ThreadPoolExecutor sendPartitionExecutor;

    public void init(Properties properties) {
        RocketMQProducerConfig rocketMQProducerConfig = new RocketMQProducerConfig();
        this.mqProperties = rocketMQProducerConfig;
        super.init(properties);
        loadRocketMQProperties(properties);
        AclClientRPCHook aclClientRPCHook = null;
        if (this.mqProperties.getAliyunAccessKey().length() > 0 && this.mqProperties.getAliyunSecretKey().length() > 0) {
            SessionCredentials sessionCredentials = new SessionCredentials();
            sessionCredentials.setAccessKey(this.mqProperties.getAliyunAccessKey());
            sessionCredentials.setSecretKey(this.mqProperties.getAliyunSecretKey());
            aclClientRPCHook = new AclClientRPCHook(sessionCredentials);
        }
        this.defaultMQProducer = new DefaultMQProducer(rocketMQProducerConfig.getProducerGroup(), aclClientRPCHook, rocketMQProducerConfig.isEnableMessageTrace(), rocketMQProducerConfig.getCustomizedTraceTopic());
        if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProducerConfig.getAccessChannel())) {
            this.defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
        }
        if (!StringUtils.isEmpty(rocketMQProducerConfig.getNamespace())) {
            this.defaultMQProducer.setNamespace(rocketMQProducerConfig.getNamespace());
        }
        this.defaultMQProducer.setNamesrvAddr(rocketMQProducerConfig.getNamesrvAddr());
        this.defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProducerConfig.getRetryTimesWhenSendFailed());
        this.defaultMQProducer.setVipChannelEnabled(rocketMQProducerConfig.isVipChannelEnabled());
        logger.info("##Start RocketMQ producer##");
        try {
            this.defaultMQProducer.start();
            int intValue = this.mqProperties.getParallelSendThreadSize().intValue();
            this.sendPartitionExecutor = new ThreadPoolExecutor(intValue, intValue, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(intValue * 2), new NamedThreadFactory("MQ-Parallel-Sender-Partition"), new ThreadPoolExecutor.CallerRunsPolicy());
        } catch (MQClientException e) {
            throw new CanalException("Start RocketMQ producer error", e);
        }
    }

    private void loadRocketMQProperties(Properties properties) {
        RocketMQProducerConfig rocketMQProducerConfig = (RocketMQProducerConfig) this.mqProperties;
        doMoreCompatibleConvert("canal.mq.servers", RocketMQConstants.ROCKETMQ_NAMESRV_ADDR, properties);
        doMoreCompatibleConvert("canal.mq.producerGroup", RocketMQConstants.ROCKETMQ_PRODUCER_GROUP, properties);
        doMoreCompatibleConvert("canal.mq.namespace", RocketMQConstants.ROCKETMQ_NAMESPACE, properties);
        doMoreCompatibleConvert("canal.mq.retries", RocketMQConstants.ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED, properties);
        String property = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_PRODUCER_GROUP);
        if (!StringUtils.isEmpty(property)) {
            rocketMQProducerConfig.setProducerGroup(property);
        }
        String property2 = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
        if (!StringUtils.isEmpty(property2)) {
            rocketMQProducerConfig.setEnableMessageTrace(Boolean.parseBoolean(property2));
        }
        String property3 = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
        if (!StringUtils.isEmpty(property3)) {
            rocketMQProducerConfig.setCustomizedTraceTopic(property3);
        }
        String property4 = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_NAMESPACE);
        if (!StringUtils.isEmpty(property4)) {
            rocketMQProducerConfig.setNamespace(property4);
        }
        String property5 = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
        if (!StringUtils.isEmpty(property5)) {
            rocketMQProducerConfig.setNamesrvAddr(property5);
        }
        String property6 = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED);
        if (!StringUtils.isEmpty(property6)) {
            rocketMQProducerConfig.setRetryTimesWhenSendFailed(Integer.parseInt(property6));
        }
        String property7 = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
        if (!StringUtils.isEmpty(property7)) {
            rocketMQProducerConfig.setVipChannelEnabled(Boolean.parseBoolean(property7));
        }
        String property8 = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_TAG);
        if (StringUtils.isEmpty(property8)) {
            return;
        }
        rocketMQProducerConfig.setTag(property8);
    }

    public void send(MQDestination mQDestination, Message message, Callback callback) {
        ExecutorTemplate executorTemplate = new ExecutorTemplate(this.sendExecutor);
        try {
            try {
                if (StringUtils.isEmpty(mQDestination.getDynamicTopic())) {
                    send(mQDestination, mQDestination.getTopic(), message);
                } else {
                    for (Map.Entry entry : MQMessageUtils.messageTopics(message, mQDestination.getTopic(), mQDestination.getDynamicTopic()).entrySet()) {
                        String replace = ((String) entry.getKey()).replace('.', '_');
                        Message message2 = (Message) entry.getValue();
                        executorTemplate.submit(() -> {
                            try {
                                send(mQDestination, replace, message2);
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        });
                    }
                    executorTemplate.waitForResult();
                }
                callback.commit();
                executorTemplate.clear();
            } catch (Throwable th) {
                logger.error(th.getMessage(), th);
                callback.rollback();
                executorTemplate.clear();
            }
        } catch (Throwable th2) {
            executorTemplate.clear();
            throw th2;
        }
    }

    public void send(MQDestination mQDestination, String str, Message message) {
        Integer parseDynamicTopicPartition = MQMessageUtils.parseDynamicTopicPartition(str, mQDestination.getDynamicTopicPartitionNum());
        if (parseDynamicTopicPartition == null) {
            parseDynamicTopicPartition = mQDestination.getPartitionsNum();
        }
        if (!this.mqProperties.isFlatMessage()) {
            if (mQDestination.getPartitionHash() == null || mQDestination.getPartitionHash().isEmpty()) {
                sendMessage(new org.apache.rocketmq.common.message.Message(str, ((RocketMQProducerConfig) this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(message, this.mqProperties.isFilterTransactionEntry())), mQDestination.getPartition() != null ? mQDestination.getPartition().intValue() : 0);
                return;
            }
            Message[] messagePartition = MQMessageUtils.messagePartition(MQMessageUtils.buildMessageData(message, this.buildExecutor), message.getId(), parseDynamicTopicPartition, mQDestination.getPartitionHash(), this.mqProperties.isDatabaseHash());
            int length = messagePartition.length;
            ExecutorTemplate executorTemplate = new ExecutorTemplate(this.sendPartitionExecutor);
            for (int i = 0; i < length; i++) {
                Message message2 = messagePartition[i];
                if (message2 != null) {
                    int i2 = i;
                    executorTemplate.submit(() -> {
                        sendMessage(new org.apache.rocketmq.common.message.Message(str, ((RocketMQProducerConfig) this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(message2, this.mqProperties.isFilterTransactionEntry())), i2);
                    });
                }
            }
            executorTemplate.waitForResult();
            return;
        }
        List messageConverter = MQMessageUtils.messageConverter(MQMessageUtils.buildMessageData(message, this.buildExecutor), message.getId());
        if (mQDestination.getPartitionHash() == null || mQDestination.getPartitionHash().isEmpty()) {
            sendMessage((List<org.apache.rocketmq.common.message.Message>) messageConverter.stream().map(flatMessage -> {
                return new org.apache.rocketmq.common.message.Message(str, ((RocketMQProducerConfig) this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}));
            }).collect(Collectors.toList()), mQDestination.getPartition() != null ? mQDestination.getPartition().intValue() : 0);
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < mQDestination.getPartitionsNum().intValue(); i3++) {
            arrayList.add(new ArrayList());
        }
        Iterator it = messageConverter.iterator();
        while (it.hasNext()) {
            FlatMessage[] messagePartition2 = MQMessageUtils.messagePartition((FlatMessage) it.next(), parseDynamicTopicPartition, mQDestination.getPartitionHash(), this.mqProperties.isDatabaseHash());
            int length2 = messagePartition2.length;
            for (int i4 = 0; i4 < length2; i4++) {
                if (messagePartition2[i4] != null) {
                    ((List) arrayList.get(i4)).add(messagePartition2[i4]);
                }
            }
        }
        ExecutorTemplate executorTemplate2 = new ExecutorTemplate(this.sendPartitionExecutor);
        for (int i5 = 0; i5 < arrayList.size(); i5++) {
            List list = (List) arrayList.get(i5);
            if (list != null && list.size() > 0) {
                int i6 = i5;
                executorTemplate2.submit(() -> {
                    sendMessage((List<org.apache.rocketmq.common.message.Message>) list.stream().map(flatMessage2 -> {
                        return new org.apache.rocketmq.common.message.Message(str, ((RocketMQProducerConfig) this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage2, new SerializerFeature[]{SerializerFeature.WriteMapNullValue}));
                    }).collect(Collectors.toList()), i6);
                });
            }
        }
        executorTemplate2.waitForResult();
    }

    private void sendMessage(org.apache.rocketmq.common.message.Message message, int i) {
        try {
            SendResult send = this.defaultMQProducer.send(message, (list, message2, obj) -> {
                return i >= list.size() ? (MessageQueue) list.get(i % list.size()) : (MessageQueue) list.get(i);
            }, (Object) null);
            if (logger.isDebugEnabled()) {
                logger.debug("Send Message Result: {}", send);
            }
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private void sendMessage(List<org.apache.rocketmq.common.message.Message> list, int i) {
        if (list.isEmpty()) {
            return;
        }
        TopicPublishInfo topicPublishInfo = (TopicPublishInfo) this.defaultMQProducer.getDefaultMQProducerImpl().getTopicPublishInfoTable().get(list.get(0).getTopic());
        if (topicPublishInfo == null) {
            Iterator<org.apache.rocketmq.common.message.Message> it = list.iterator();
            while (it.hasNext()) {
                sendMessage(it.next(), i);
            }
            return;
        }
        List messageQueueList = topicPublishInfo.getMessageQueueList();
        int size = messageQueueList.size();
        if (size <= 0) {
            Iterator<org.apache.rocketmq.common.message.Message> it2 = list.iterator();
            while (it2.hasNext()) {
                sendMessage(it2.next(), i);
            }
        } else {
            try {
                SendResult send = this.defaultMQProducer.send(list, i >= size ? (MessageQueue) messageQueueList.get(i % size) : (MessageQueue) messageQueueList.get(i));
                if (logger.isDebugEnabled()) {
                    logger.debug("Send Message Result: {}", send);
                }
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }
    }

    public void stop() {
        logger.info("## Stop RocketMQ producer##");
        this.defaultMQProducer.shutdown();
        if (this.sendPartitionExecutor != null) {
            this.sendPartitionExecutor.shutdownNow();
        }
        super.stop();
    }
}
