/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.producer;

import com.taobao.gecko.core.command.RequestCommand;
import com.taobao.gecko.core.command.ResponseCommand;
import com.taobao.gecko.core.util.ConcurrentHashSet;
import com.taobao.gecko.core.util.OpaqueGenerator;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.SingleRequestCallBackListener;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.MessageAccessor;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.PartitionSelector;
import com.taobao.metamorphosis.client.producer.ProducerZooKeeper;
import com.taobao.metamorphosis.client.producer.SendMessageCallback;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.client.transaction.TransactionContext;
import com.taobao.metamorphosis.client.transaction.TransactionSession;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.exception.MetaOpeartionTimeoutException;
import com.taobao.metamorphosis.exception.TransactionInProgressException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.PutCommand;
import com.taobao.metamorphosis.transaction.TransactionId;
import com.taobao.metamorphosis.utils.LongSequenceGenerator;
import com.taobao.metamorphosis.utils.MessageFlagUtils;
import com.taobao.metamorphosis.utils.MessageUtils;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.transaction.xa.XAException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SimpleMessageProducer
implements MessageProducer,
TransactionSession {
    private static final Log log = LogFactory.getLog(SimpleMessageProducer.class);
    protected static final long DEFAULT_OP_TIMEOUT = 3000L;
    private static final int TIMEOUT_THRESHOLD = Integer.parseInt(System.getProperty("meta.send.timeout.threshold", "200"));
    private final MetaMessageSessionFactory messageSessionFactory;
    protected final RemotingClientWrapper remotingClient;
    protected final PartitionSelector partitionSelector;
    protected final ProducerZooKeeper producerZooKeeper;
    protected final String sessionId;
    protected volatile int transactionTimeout = 0;
    private volatile boolean shutdown;
    private static final int MAX_RETRY = 1;
    private final LongSequenceGenerator localTxIdGenerator;
    private final ConcurrentHashSet<String> publishedTopics = new ConcurrentHashSet();
    static final Pattern RESULT_SPLITER = Pattern.compile(" ");
    protected final ThreadLocal<LastSentInfo> lastSentInfo = new ThreadLocal();
    protected final ThreadLocal<TransactionContext> transactionContext = new ThreadLocal();

    public SimpleMessageProducer(MetaMessageSessionFactory messageSessionFactory, RemotingClientWrapper remotingClient, PartitionSelector partitionSelector, ProducerZooKeeper producerZooKeeper, String sessionId) {
        this.sessionId = sessionId;
        this.messageSessionFactory = messageSessionFactory;
        this.remotingClient = remotingClient;
        this.partitionSelector = partitionSelector;
        this.producerZooKeeper = producerZooKeeper;
        this.localTxIdGenerator = new LongSequenceGenerator();
    }

    public MetaMessageSessionFactory getParent() {
        return this.messageSessionFactory;
    }

    @Override
    public PartitionSelector getPartitionSelector() {
        return this.partitionSelector;
    }

    @Override
    @Deprecated
    public boolean isOrdered() {
        return false;
    }

    @Override
    public void publish(String topic) {
        this.checkState();
        this.checkTopic(topic);
        if (!this.publishedTopics.contains((Object)topic)) {
            this.producerZooKeeper.publishTopic(topic, this);
            this.publishedTopics.add((Object)topic);
        }
    }

    @Override
    public void setDefaultTopic(String topic) {
        if (!this.publishedTopics.contains((Object)topic)) {
            this.producerZooKeeper.setDefaultTopic(topic, this);
            this.publishedTopics.add((Object)topic);
        }
    }

    private void checkTopic(String topic) {
        if (StringUtils.isBlank((String)topic)) {
            throw new IllegalArgumentException("Blank topic:" + topic);
        }
    }

    @Override
    public SendResult sendMessage(Message message, long timeout, TimeUnit unit) throws MetaClientException, InterruptedException {
        this.checkState();
        this.checkMessage(message);
        return this.sendMessageToServer(message, timeout, unit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected SendResult sendMessageToServer(Message message, long timeout, TimeUnit unit) throws MetaClientException, InterruptedException, MetaOpeartionTimeoutException {
        SendResult result = null;
        long start = System.currentTimeMillis();
        int retry = 0;
        long timeoutInMills = TimeUnit.MILLISECONDS.convert(timeout, unit);
        byte[] data = MessageUtils.encodePayload((Message)message);
        try {
            for (int i = 0; i < 1; ++i) {
                result = this.send0(message, data, timeout, unit);
                if (result.isSuccess()) {
                    break;
                }
                if (System.currentTimeMillis() - start >= timeoutInMills) {
                    throw new MetaOpeartionTimeoutException("Send message timeout in " + timeoutInMills + " mills");
                }
                ++retry;
            }
        }
        finally {
            long duration = System.currentTimeMillis() - start;
            MetaStatLog.addStatValue2(null, (String)"cli_put_time", (String)message.getTopic(), (long)duration);
            if (duration > (long)TIMEOUT_THRESHOLD) {
                MetaStatLog.addStatValue2(null, (String)"cli_put_timeout", (String)message.getTopic(), (long)duration);
            }
            if (retry > 0) {
                MetaStatLog.addStatValue2(null, (String)"cli_put_retry", (String)message.getTopic(), (long)retry);
            }
        }
        return result;
    }

    private Partition selectPartition(Message message) throws MetaClientException {
        return this.producerZooKeeper.selectPartition(message.getTopic(), message, this.partitionSelector);
    }

    private TransactionContext getTx() throws MetaClientException {
        TransactionContext ctx = this.transactionContext.get();
        if (ctx == null) {
            throw new MetaClientException("There is no transaction begun");
        }
        return ctx;
    }

    @Override
    public void removeContext(TransactionContext ctx) {
        this.transactionContext.remove();
        this.resetLastSentInfo();
    }

    @Override
    public String getSessionId() {
        return this.sessionId;
    }

    @Override
    public void setTransactionTimeout(int seconds) throws MetaClientException {
        if (seconds < 0) {
            throw new IllegalArgumentException("Illegal transaction timeout value");
        }
        this.transactionTimeout = seconds;
    }

    @Override
    public int getTransactionTimeout() throws MetaClientException {
        return this.transactionTimeout;
    }

    @Override
    public void beginTransaction() throws MetaClientException {
        TransactionContext ctx = this.transactionContext.get();
        if (ctx != null) {
            throw new TransactionInProgressException("A transaction has begun");
        }
        ctx = new TransactionContext(this.remotingClient, null, this, this.localTxIdGenerator, this.transactionTimeout);
        this.transactionContext.set(ctx);
    }

    protected void beforeSendMessageFirstTime(String serverUrl) throws MetaClientException, XAException {
        TransactionContext tx = this.getTx();
        if (tx.getTransactionId() == null) {
            tx.setServerUrl(serverUrl);
            tx.begin();
        }
    }

    protected void logLastSentInfo(String serverUrl) {
        if (this.isInTransaction() && this.lastSentInfo.get() == null) {
            this.lastSentInfo.set(new LastSentInfo(serverUrl));
        }
    }

    protected TransactionId getTransactionId() throws MetaClientException {
        if (this.isInTransaction()) {
            return this.getTx().getTransactionId();
        }
        return null;
    }

    protected boolean isInTransaction() {
        return this.transactionContext.get() != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit() throws MetaClientException {
        try {
            TransactionContext ctx = this.getTx();
            ctx.commit();
        }
        finally {
            this.resetLastSentInfo();
            this.transactionContext.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback() throws MetaClientException {
        try {
            TransactionContext ctx = this.getTx();
            ctx.rollback();
        }
        finally {
            this.resetLastSentInfo();
            this.transactionContext.remove();
        }
    }

    protected void resetLastSentInfo() {
        this.lastSentInfo.remove();
    }

    private SendResult send0(Message message, byte[] encodedData, long timeout, TimeUnit unit) throws InterruptedException, MetaClientException {
        try {
            LastSentInfo info;
            String topic = message.getTopic();
            Partition partition = null;
            String serverUrl = null;
            if (this.isInTransaction() && (info = this.lastSentInfo.get()) != null && (partition = this.producerZooKeeper.selectPartition(topic, message, this.partitionSelector, serverUrl = info.serverUrl)) == null) {
                throw new MetaClientException("There is no partitions in `" + serverUrl + "` to send message with topic `" + topic + "` in a transaction");
            }
            if (partition == null) {
                partition = this.selectPartition(message);
            }
            if (partition == null) {
                throw new MetaClientException("There is no aviable partition for topic " + topic + ",maybe you don't publish it at first?");
            }
            if (serverUrl == null) {
                serverUrl = this.producerZooKeeper.selectBroker(topic, partition);
            }
            if (serverUrl == null) {
                throw new MetaClientException("There is no aviable server right now for topic " + topic + " and partition " + partition + ",maybe you don't publish it at first?");
            }
            if (this.isInTransaction() && this.lastSentInfo.get() == null) {
                this.beforeSendMessageFirstTime(serverUrl);
            }
            int flag = MessageFlagUtils.getFlag((Message)message);
            PutCommand putCommand = new PutCommand(topic, partition.getPartition(), encodedData, this.getTransactionId(), flag, Integer.valueOf(OpaqueGenerator.getNextOpaque()));
            BooleanCommand resp = this.invokeToGroup(serverUrl, partition, putCommand, message, timeout, unit);
            return this.genSendResult(message, partition, serverUrl, resp);
        }
        catch (TimeoutException e) {
            throw new MetaOpeartionTimeoutException("Send message timeout in " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " mills");
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            throw new MetaClientException("send message failed", (Throwable)e);
        }
    }

    private SendResult genSendResult(Message message, Partition partition, String serverUrl, BooleanCommand resp) {
        String resultStr = resp.getErrorMsg();
        switch (resp.getCode()) {
            case 200: {
                String[] tmps = RESULT_SPLITER.split(resultStr);
                MessageAccessor.setId((Message)message, (long)Long.parseLong(tmps[0]));
                Partition serverPart = new Partition(partition.getBrokerId(), Integer.parseInt(tmps[1]));
                MessageAccessor.setPartition((Message)message, (Partition)serverPart);
                this.logLastSentInfo(serverUrl);
                return new SendResult(true, serverPart, Long.parseLong(tmps[2]), null);
            }
            case 403: {
                if (log.isDebugEnabled()) {
                    log.debug((Object)resultStr);
                }
                return new SendResult(false, null, -1L, String.valueOf(403));
            }
        }
        return new SendResult(false, null, -1L, resultStr);
    }

    protected BooleanCommand invokeToGroup(String serverUrl, Partition partition, PutCommand putCommand, Message message, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, NotifyRemotingException {
        return (BooleanCommand)this.remotingClient.invokeToGroup(serverUrl, (RequestCommand)putCommand, timeout, unit);
    }

    protected void checkState() {
        if (this.shutdown) {
            throw new IllegalStateException("Producer has been shutdown");
        }
    }

    protected void checkMessage(Message message) throws MetaClientException {
        if (message == null) {
            throw new InvalidMessageException("Null message");
        }
        if (StringUtils.isBlank((String)message.getTopic())) {
            throw new InvalidMessageException("Blank topic");
        }
        if (message.getData() == null) {
            throw new InvalidMessageException("Null data");
        }
    }

    @Override
    public void sendMessage(final Message message, final SendMessageCallback cb, long time, TimeUnit unit) {
        try {
            String topic = message.getTopic();
            final Partition partition = this.selectPartition(message);
            if (partition == null) {
                throw new MetaClientException("There is no aviable partition for topic " + topic + ",maybe you don't publish it at first?");
            }
            final String serverUrl = this.producerZooKeeper.selectBroker(topic, partition);
            if (serverUrl == null) {
                throw new MetaClientException("There is no aviable server right now for topic " + topic + " and partition " + partition + ",maybe you don't publish it at first?");
            }
            int flag = MessageFlagUtils.getFlag((Message)message);
            byte[] encodedData = MessageUtils.encodePayload((Message)message);
            PutCommand putCommand = new PutCommand(topic, partition.getPartition(), encodedData, this.getTransactionId(), flag, Integer.valueOf(OpaqueGenerator.getNextOpaque()));
            this.remotingClient.sendToGroup(serverUrl, (RequestCommand)putCommand, new SingleRequestCallBackListener(){

                public void onResponse(ResponseCommand responseCommand, Connection conn) {
                    SendResult rt = SimpleMessageProducer.this.genSendResult(message, partition, serverUrl, (BooleanCommand)responseCommand);
                    cb.onMessageSent(rt);
                }

                public void onException(Exception e) {
                    cb.onException(e);
                }

                public ThreadPoolExecutor getExecutor() {
                    return null;
                }
            }, time, unit);
        }
        catch (Throwable e) {
            cb.onException(e);
        }
    }

    @Override
    public void sendMessage(Message message, SendMessageCallback cb) {
        this.sendMessage(message, cb, 3000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public SendResult sendMessage(Message message) throws MetaClientException, InterruptedException {
        return this.sendMessage(message, 3000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public synchronized void shutdown() throws MetaClientException {
        if (this.shutdown) {
            return;
        }
        for (String topic : this.publishedTopics) {
            this.producerZooKeeper.unPublishTopic(topic, this);
        }
        this.shutdown = true;
        this.publishedTopics.clear();
        this.messageSessionFactory.removeChild(this);
    }

    static class LastSentInfo {
        final String serverUrl;

        public LastSentInfo(String serverUrl) {
            this.serverUrl = serverUrl;
        }
    }
}

