/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.extension.producer;

import com.taobao.gecko.core.command.RequestCommand;
import com.taobao.gecko.core.command.ResponseCommand;
import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.command.kernel.BooleanAckCommand;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.SingleRequestCallBackListener;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.extension.producer.AsyncIgnoreMessageProcessor;
import com.taobao.metamorphosis.client.extension.producer.AsyncMessageProducer;
import com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager;
import com.taobao.metamorphosis.client.extension.producer.SlidingWindow;
import com.taobao.metamorphosis.client.producer.PartitionSelector;
import com.taobao.metamorphosis.client.producer.ProducerZooKeeper;
import com.taobao.metamorphosis.client.producer.SimpleMessageProducer;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.PutCommand;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AsyncMetaMessageProducer
extends SimpleMessageProducer
implements AsyncMessageProducer,
MessageRecoverManager.MessageRecoverer {
    private static final Log log = LogFactory.getLog(AsyncMetaMessageProducer.class);
    private static final int DEFAULT_PERMITS = 20000;
    private final SlidingWindow slidingWindow;
    private AsyncMessageProducer.IgnoreMessageProcessor ignoreMessageProcessor;

    public AsyncMetaMessageProducer(MetaMessageSessionFactory messageSessionFactory, RemotingClientWrapper remotingClient, PartitionSelector partitionSelector, ProducerZooKeeper producerZooKeeper, String sessionId, int slidingWindowSize0, AsyncMessageProducer.IgnoreMessageProcessor processor) {
        super(messageSessionFactory, remotingClient, partitionSelector, producerZooKeeper, sessionId);
        this.slidingWindow = new SlidingWindow(slidingWindowSize0 > 0 ? slidingWindowSize0 : 20000);
        this.ignoreMessageProcessor = processor != null ? processor : new AsyncIgnoreMessageProcessor(messageSessionFactory.getMetaClientConfig(), this);
    }

    @Override
    public void asyncSendMessage(Message message) {
        this.asyncSendMessage(message, 3000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void asyncSendMessage(Message message, long timeout, TimeUnit unit) {
        try {
            super.sendMessage(message, timeout, unit);
        }
        catch (IllegalStateException e) {
            log.warn((Object)e);
        }
        catch (InvalidMessageException e) {
            log.warn((Object)e);
        }
        catch (MetaClientException e) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("save to local strage,and waitting for recover. cause:" + e.getMessage()));
            }
            this.handleSendFailMessage(message);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (Throwable e) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"save to local strage,and waitting for recover. cause:", e);
            }
            this.handleSendFailMessage(message);
        }
    }

    @Override
    public void setIgnoreMessageProcessor(AsyncMessageProducer.IgnoreMessageProcessor ignoreMessageProcessor) {
        this.ignoreMessageProcessor = ignoreMessageProcessor;
    }

    @Override
    protected BooleanCommand invokeToGroup(String serverUrl, Partition partition, PutCommand putCommand, Message message, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, NotifyRemotingException {
        try {
            return this.trySend(serverUrl, putCommand, timeout, unit);
        }
        catch (MetaMessageOverflowException e) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("save to local strage,and waitting for recover. cause:" + e.getMessage()));
            }
            return this.processOverMessage(partition, putCommand, message, e);
        }
    }

    private BooleanCommand trySend(String serverUrl, PutCommand putCommand, long timeout, TimeUnit unit) throws NotifyRemotingException, InterruptedException {
        int dataLength;
        int n = dataLength = putCommand.getData() != null ? putCommand.getData().length : 0;
        if (this.slidingWindow.tryAcquireByLength(dataLength)) {
            try {
                this.remotingClient.sendToGroup(serverUrl, (RequestCommand)putCommand, new MessageSendCallBackListener(putCommand), timeout, unit);
                return new BooleanCommand(putCommand.getOpaque(), 200, "-1 " + putCommand.getPartition() + " -1");
            }
            catch (NotifyRemotingException e) {
                this.slidingWindow.releaseByLenth(dataLength);
                if (e.getMessage().contains("\u8d85\u8fc7\u6d41\u91cf\u9650\u5236") || e.getMessage().contains("\u8d85\u8fc7\u5141\u8bb8\u7684\u6700\u5927CallBack\u4e2a\u6570")) {
                    throw new MetaMessageOverflowException(e);
                }
                throw e;
            }
        }
        throw new MetaMessageOverflowException("\u53d1\u9001\u6d88\u606f\u6d41\u91cf\u8d85\u8fc7\u6ed1\u52a8\u7a97\u53e3\u5355\u4f4d\u603b\u6570\uff1a" + this.slidingWindow.getWindowsSize());
    }

    private BooleanCommand processOverMessage(Partition partition, PutCommand putCommand, Message message, MetaMessageOverflowException e2) throws MetaMessageOverflowException {
        if (this.ignoreMessageProcessor != null) {
            this.handleSendFailMessage(message);
            return new BooleanCommand(putCommand.getOpaque(), 200, "-1 " + putCommand.getPartition() + " -1");
        }
        throw e2;
    }

    private void handleSendFailMessage(Message message) {
        try {
            this.ignoreMessageProcessor.handle(message);
        }
        catch (Exception e1) {
            log.warn((Object)e1);
        }
    }

    @Override
    public void handle(Message msg) throws Exception {
        this.asyncSendMessage(msg);
    }

    AsyncMessageProducer.IgnoreMessageProcessor getIgnoreMessageProcessor() {
        return this.ignoreMessageProcessor;
    }

    private class MessageSendCallBackListener
    implements SingleRequestCallBackListener {
        int dataLenth;
        AtomicBoolean released = new AtomicBoolean(false);

        MessageSendCallBackListener(PutCommand putCommand) {
            byte[] data = putCommand.getData();
            this.dataLenth = data != null ? data.length : 0;
        }

        public void onResponse(ResponseCommand responseCommand, Connection conn) {
            this.release();
            if (responseCommand.getResponseStatus() != ResponseStatus.NO_ERROR) {
                StringBuilder sb = new StringBuilder();
                sb.append("onResponse. Status:").append(responseCommand.getResponseStatus());
                if (responseCommand instanceof BooleanCommand) {
                    sb.append(",Code:").append(((BooleanCommand)responseCommand).getCode());
                }
                if (responseCommand instanceof BooleanAckCommand) {
                    sb.append(",ErrorMsg:").append(((BooleanAckCommand)responseCommand).getErrorMsg());
                    sb.append(",ResponseHost:").append(((BooleanAckCommand)responseCommand).getResponseHost());
                }
                log.warn((Object)sb.toString());
            }
        }

        public void onException(Exception e) {
            this.release();
            log.warn((Object)e);
        }

        private void release() {
            if (this.released.compareAndSet(false, true)) {
                AsyncMetaMessageProducer.this.slidingWindow.releaseByLenth(this.dataLenth);
            }
        }

        public ThreadPoolExecutor getExecutor() {
            return null;
        }
    }

    public static class MetaMessageOverflowException
    extends NotifyRemotingException {
        private static final long serialVersionUID = -1842231102008256662L;

        public MetaMessageOverflowException(String string) {
            super(string);
        }

        public MetaMessageOverflowException(Throwable e) {
            super(e);
        }
    }
}

