package io.moquette.spi.impl;

import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.parser.proto.messages.AbstractMessage;
import io.moquette.parser.proto.messages.ConnAckMessage;
import io.moquette.parser.proto.messages.ConnectMessage;
import io.moquette.parser.proto.messages.PubAckMessage;
import io.moquette.parser.proto.messages.PubCompMessage;
import io.moquette.parser.proto.messages.PubRecMessage;
import io.moquette.parser.proto.messages.PubRelMessage;
import io.moquette.parser.proto.messages.PublishMessage;
import io.moquette.parser.proto.messages.SubAckMessage;
import io.moquette.parser.proto.messages.SubscribeMessage;
import io.moquette.parser.proto.messages.UnsubAckMessage;
import io.moquette.parser.proto.messages.UnsubscribeMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.netty.AutoFlushHandler;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMatchingCondition;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.MessageGUID;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsStore;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.IdleStateHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor.class */
public class ProtocolProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolProcessor.class);
    protected ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors;
    protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;
    private SubscriptionsStore subscriptions;
    private boolean allowAnonymous;
    private boolean allowZeroByteClientId;
    private IAuthorizator m_authorizator;
    private IMessagesStore m_messagesStore;
    private ISessionsStore m_sessionsStore;
    private IAuthenticator m_authenticator;
    private BrokerInterceptor m_interceptor;
    private String m_server_port;
    private Qos0PublishHandler qos0PublishHandler;
    private Qos1PublishHandler qos1PublishHandler;
    private Qos2PublishHandler qos2PublishHandler;
    private MessagesPublisher messagesPublisher;
    private InternalRepublisher internalRepublisher;
    private ConcurrentMap<String, WillMessage> m_willStore = new ConcurrentHashMap();

    /* renamed from: io.moquette.spi.impl.ProtocolProcessor$2, reason: invalid class name */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$moquette$parser$proto$messages$AbstractMessage$QOSType = new int[AbstractMessage.QOSType.values().length];

        static {
            try {
                $SwitchMap$io$moquette$parser$proto$messages$AbstractMessage$QOSType[AbstractMessage.QOSType.MOST_ONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$moquette$parser$proto$messages$AbstractMessage$QOSType[AbstractMessage.QOSType.LEAST_ONE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$moquette$parser$proto$messages$AbstractMessage$QOSType[AbstractMessage.QOSType.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$RunningSubscription.class */
    public class RunningSubscription {
        final String clientID;
        final long packetId;

        RunningSubscription(String str, long j) {
            this.clientID = str;
            this.packetId = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            RunningSubscription runningSubscription = (RunningSubscription) obj;
            if (this.packetId != runningSubscription.packetId) {
                return false;
            }
            return this.clientID != null ? this.clientID.equals(runningSubscription.clientID) : runningSubscription.clientID == null;
        }

        public int hashCode() {
            return (31 * (this.clientID != null ? this.clientID.hashCode() : 0)) + ((int) (this.packetId ^ (this.packetId >>> 32)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$SubscriptionState.class */
    public enum SubscriptionState {
        STORED,
        VERIFIED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$WillMessage.class */
    public static final class WillMessage {
        private final String topic;
        private final ByteBuffer payload;
        private final boolean retained;
        private final AbstractMessage.QOSType qos;

        public WillMessage(String str, ByteBuffer byteBuffer, boolean z, AbstractMessage.QOSType qOSType) {
            this.topic = str;
            this.payload = byteBuffer;
            this.retained = z;
            this.qos = qOSType;
        }

        public String getTopic() {
            return this.topic;
        }

        public ByteBuffer getPayload() {
            return this.payload;
        }

        public boolean isRetained() {
            return this.retained;
        }

        public AbstractMessage.QOSType getQos() {
            return this.qos;
        }
    }

    public void init(SubscriptionsStore subscriptionsStore, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor) {
        init(subscriptionsStore, iMessagesStore, iSessionsStore, iAuthenticator, z, false, iAuthorizator, brokerInterceptor, null);
    }

    public void init(SubscriptionsStore subscriptionsStore, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, boolean z2, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor) {
        init(subscriptionsStore, iMessagesStore, iSessionsStore, iAuthenticator, z, z2, iAuthorizator, brokerInterceptor, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init(SubscriptionsStore subscriptionsStore, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, boolean z2, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor, String str) {
        this.connectionDescriptors = new ConcurrentHashMap();
        this.subscriptionInCourse = new ConcurrentHashMap();
        this.m_interceptor = brokerInterceptor;
        this.subscriptions = subscriptionsStore;
        this.allowAnonymous = z;
        this.allowZeroByteClientId = z2;
        this.m_authorizator = iAuthorizator;
        LOG.trace("subscription tree on init {}", subscriptionsStore.dumpTree());
        this.m_authenticator = iAuthenticator;
        this.m_messagesStore = iMessagesStore;
        this.m_sessionsStore = iSessionsStore;
        this.m_server_port = str;
        PersistentQueueMessageSender persistentQueueMessageSender = new PersistentQueueMessageSender(this.connectionDescriptors);
        this.messagesPublisher = new MessagesPublisher(this.connectionDescriptors, iSessionsStore, this.m_messagesStore, persistentQueueMessageSender);
        this.qos0PublishHandler = new Qos0PublishHandler(this.m_authorizator, subscriptionsStore, this.m_messagesStore, this.m_interceptor, this.messagesPublisher);
        this.qos1PublishHandler = new Qos1PublishHandler(this.m_authorizator, subscriptionsStore, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.m_server_port, this.messagesPublisher);
        this.qos2PublishHandler = new Qos2PublishHandler(this.m_authorizator, subscriptionsStore, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.m_sessionsStore, this.m_server_port, this.messagesPublisher);
        this.internalRepublisher = new InternalRepublisher(persistentQueueMessageSender);
    }

    public void processConnect(Channel channel, ConnectMessage connectMessage) {
        LOG.info("CONNECT for client <{}>", connectMessage.getClientID());
        if (connectMessage.getProtocolVersion() != 3 && connectMessage.getProtocolVersion() != 4) {
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte) 1);
            LOG.warn("CONNECT sent bad proto ConnAck");
            channel.writeAndFlush(connAckMessage);
            channel.close();
            return;
        }
        if (connectMessage.getClientID() == null || connectMessage.getClientID().length() == 0) {
            if (!connectMessage.isCleanSession() || !this.allowZeroByteClientId) {
                ConnAckMessage connAckMessage2 = new ConnAckMessage();
                connAckMessage2.setReturnCode((byte) 2);
                channel.writeAndFlush(connAckMessage2);
                channel.close();
                LOG.warn("CONNECT sent rejected identifier ConnAck");
                return;
            }
            String replace = UUID.randomUUID().toString().replace("-", "");
            connectMessage.setClientID(replace);
            LOG.info("Client connected with server generated identifier: {}", replace);
        }
        if (!login(channel, connectMessage)) {
            channel.close();
            return;
        }
        String clientID = connectMessage.getClientID();
        ConnectionDescriptor connectionDescriptor = new ConnectionDescriptor(clientID, channel, connectMessage.isCleanSession());
        ConnectionDescriptor putIfAbsent = this.connectionDescriptors.putIfAbsent(clientID, connectionDescriptor);
        if (putIfAbsent != null) {
            LOG.info("Found an existing connection with same client ID <{}>, forcing to close", connectMessage.getClientID());
            putIfAbsent.abort();
            return;
        }
        initializeKeepAliveTimeout(channel, connectMessage);
        storeWillMessage(connectMessage);
        if (!sendAck(connectionDescriptor, connectMessage)) {
            channel.close();
            return;
        }
        this.m_interceptor.notifyClientConnected(connectMessage);
        ClientSession createOrLoadClientSession = createOrLoadClientSession(connectionDescriptor, connectMessage);
        if (createOrLoadClientSession == null) {
            channel.close();
            return;
        }
        if (!republish(connectionDescriptor, connectMessage, createOrLoadClientSession)) {
            channel.close();
            return;
        }
        if (connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED, ConnectionDescriptor.ConnectionState.ESTABLISHED)) {
            LOG.info("Connection established");
        } else {
            channel.close();
        }
        LOG.info("CONNECT processed");
    }

    private boolean login(Channel channel, ConnectMessage connectMessage) {
        if (!connectMessage.isUserFlag()) {
            if (this.allowAnonymous) {
                return true;
            }
            failedCredentials(channel);
            return false;
        }
        byte[] bArr = null;
        if (connectMessage.isPasswordFlag()) {
            bArr = connectMessage.getPassword();
        } else if (!this.allowAnonymous) {
            failedCredentials(channel);
            return false;
        }
        if (this.m_authenticator.checkValid(connectMessage.getClientID(), connectMessage.getUsername(), bArr)) {
            NettyUtils.userName(channel, connectMessage.getUsername());
            return true;
        }
        failedCredentials(channel);
        return false;
    }

    private boolean sendAck(ConnectionDescriptor connectionDescriptor, ConnectMessage connectMessage) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.DISCONNECTED, ConnectionDescriptor.ConnectionState.SENDACK)) {
            return false;
        }
        ConnAckMessage connAckMessage = new ConnAckMessage();
        connAckMessage.setReturnCode((byte) 0);
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(connectMessage.getClientID());
        boolean z = sessionForClient != null;
        if (!connectMessage.isCleanSession() && z) {
            connAckMessage.setSessionPresent(true);
        }
        if (z) {
            sessionForClient.cleanSession(connectMessage.isCleanSession());
        }
        connectionDescriptor.channel.writeAndFlush(connAckMessage);
        return true;
    }

    private void initializeKeepAliveTimeout(Channel channel, ConnectMessage connectMessage) {
        int keepAlive = connectMessage.getKeepAlive();
        LOG.debug("Connect with keepAlive {} s", Integer.valueOf(keepAlive));
        NettyUtils.keepAlive(channel, keepAlive);
        NettyUtils.cleanSession(channel, connectMessage.isCleanSession());
        NettyUtils.clientID(channel, connectMessage.getClientID());
        LOG.debug("Connect create session <{}>", channel);
        setIdleTime(channel.pipeline(), Math.round(keepAlive * 1.5f));
    }

    private void storeWillMessage(ConnectMessage connectMessage) {
        if (connectMessage.isWillFlag()) {
            AbstractMessage.QOSType valueOf = AbstractMessage.QOSType.valueOf(connectMessage.getWillQos());
            byte[] willMessage = connectMessage.getWillMessage();
            this.m_willStore.put(connectMessage.getClientID(), new WillMessage(connectMessage.getWillTopic(), (ByteBuffer) ByteBuffer.allocate(willMessage.length).put(willMessage).flip(), connectMessage.isWillRetain(), valueOf));
            LOG.info("Session for clientID <{}> with will to topic {}", connectMessage.getClientID(), connectMessage.getWillTopic());
        }
    }

    private ClientSession createOrLoadClientSession(ConnectionDescriptor connectionDescriptor, ConnectMessage connectMessage) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.SENDACK, ConnectionDescriptor.ConnectionState.SESSION_CREATED)) {
            return null;
        }
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(connectMessage.getClientID());
        if (!(sessionForClient != null)) {
            LOG.debug("Create persistent session for clientID <{}>", connectMessage.getClientID());
            sessionForClient = this.m_sessionsStore.createNewSession(connectMessage.getClientID(), connectMessage.isCleanSession());
        }
        if (connectMessage.isCleanSession()) {
            sessionForClient.cleanSession();
        }
        LOG.debug("Created session for client ID <{}> with clean session {}", connectMessage.getClientID(), Boolean.valueOf(connectMessage.isCleanSession()));
        return sessionForClient;
    }

    private boolean republish(ConnectionDescriptor connectionDescriptor, ConnectMessage connectMessage, ClientSession clientSession) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.SESSION_CREATED, ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED)) {
            return false;
        }
        if (!connectMessage.isCleanSession()) {
            republishStoredInSession(clientSession);
        }
        setupAutoFlusher(connectionDescriptor.channel.pipeline(), 500);
        return true;
    }

    private void failedCredentials(Channel channel) {
        ConnAckMessage connAckMessage = new ConnAckMessage();
        connAckMessage.setReturnCode((byte) 4);
        channel.writeAndFlush(connAckMessage);
        LOG.info("Client {} failed to connect with bad username or password.", channel);
    }

    private void setupAutoFlusher(ChannelPipeline channelPipeline, int i) {
        try {
            channelPipeline.addAfter("idleEventHandler", "autoFlusher", new AutoFlushHandler(i, TimeUnit.MILLISECONDS));
        } catch (NoSuchElementException e) {
            channelPipeline.addFirst("autoFlusher", new AutoFlushHandler(i, TimeUnit.MILLISECONDS));
        }
    }

    private void setIdleTime(ChannelPipeline channelPipeline, int i) {
        if (channelPipeline.names().contains("idleStateHandler")) {
            channelPipeline.remove("idleStateHandler");
        }
        channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, i));
    }

    private void republishStoredInSession(ClientSession clientSession) {
        LOG.trace("republishStoredInSession for client <{}>", clientSession);
        BlockingQueue<IMessagesStore.StoredMessage> queue = clientSession.queue();
        if (queue.isEmpty()) {
            LOG.info("No stored messages for client <{}>", clientSession.clientID);
        } else {
            LOG.info("republishing stored messages to client <{}>", clientSession.clientID);
            this.internalRepublisher.publishStored(clientSession, queue);
        }
    }

    public void processPubAck(Channel channel, PubAckMessage pubAckMessage) {
        String clientID = NettyUtils.clientID(channel);
        int intValue = pubAckMessage.getMessageID().intValue();
        String userName = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", Integer.valueOf(intValue));
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        IMessagesStore.StoredMessage inflightMessage = sessionForClient.getInflightMessage(intValue);
        sessionForClient.inFlightAcknowledged(intValue);
        this.m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(inflightMessage, inflightMessage.getTopic(), userName));
    }

    public static IMessagesStore.StoredMessage asStoredMessage(PublishMessage publishMessage) {
        IMessagesStore.StoredMessage storedMessage = new IMessagesStore.StoredMessage(publishMessage.getPayload().array(), publishMessage.getQos(), publishMessage.getTopicName());
        storedMessage.setRetained(publishMessage.isRetainFlag());
        storedMessage.setMessageID(publishMessage.getMessageID());
        return storedMessage;
    }

    private static IMessagesStore.StoredMessage asStoredMessage(WillMessage willMessage) {
        IMessagesStore.StoredMessage storedMessage = new IMessagesStore.StoredMessage(willMessage.getPayload().array(), willMessage.getQos(), willMessage.getTopic());
        storedMessage.setRetained(willMessage.isRetained());
        return storedMessage;
    }

    public void processPublish(Channel channel, PublishMessage publishMessage) {
        LOG.info("PUB --PUBLISH--> SRV executePublish invoked with {}", publishMessage);
        switch (AnonymousClass2.$SwitchMap$io$moquette$parser$proto$messages$AbstractMessage$QOSType[publishMessage.getQos().ordinal()]) {
            case 1:
                this.qos0PublishHandler.receivedPublishQos0(channel, publishMessage);
                return;
            case 2:
                this.qos1PublishHandler.receivedPublishQos1(channel, publishMessage);
                return;
            case 3:
                this.qos2PublishHandler.receivedPublishQos2(channel, publishMessage);
                return;
            default:
                return;
        }
    }

    public void internalPublish(PublishMessage publishMessage) {
        AbstractMessage.QOSType qos = publishMessage.getQos();
        String topicName = publishMessage.getTopicName();
        LOG.info("embedded PUBLISH on topic <{}> with QoS {}", topicName, qos);
        MessageGUID messageGUID = null;
        IMessagesStore.StoredMessage asStoredMessage = asStoredMessage(publishMessage);
        if (publishMessage.getClientId() == null || publishMessage.getClientId().isEmpty()) {
            asStoredMessage.setClientID("BROKER_SELF");
        } else {
            asStoredMessage.setClientID(publishMessage.getClientId());
        }
        asStoredMessage.setMessageID(1);
        if (qos == AbstractMessage.QOSType.EXACTLY_ONCE) {
            messageGUID = this.m_messagesStore.storePublishForFuture(asStoredMessage);
        }
        this.messagesPublisher.publish2Subscribers(asStoredMessage, this.subscriptions.matches(topicName));
        if (publishMessage.isRetainFlag()) {
            if (qos == AbstractMessage.QOSType.MOST_ONE || !publishMessage.getPayload().hasRemaining()) {
                this.m_messagesStore.cleanRetained(topicName);
                return;
            }
            if (messageGUID == null) {
                messageGUID = this.m_messagesStore.storePublishForFuture(asStoredMessage);
            }
            this.m_messagesStore.storeRetained(topicName, messageGUID);
        }
    }

    private void forwardPublishWill(WillMessage willMessage, String str) {
        Integer num = null;
        if (willMessage.getQos() != AbstractMessage.QOSType.MOST_ONE) {
            num = Integer.valueOf(this.m_sessionsStore.nextPacketID(str));
        }
        IMessagesStore.StoredMessage asStoredMessage = asStoredMessage(willMessage);
        asStoredMessage.setClientID(str);
        asStoredMessage.setMessageID(num);
        this.messagesPublisher.publish2Subscribers(asStoredMessage, this.subscriptions.matches(asStoredMessage.getTopic()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static AbstractMessage.QOSType lowerQosToTheSubscriptionDesired(Subscription subscription, AbstractMessage.QOSType qOSType) {
        if (qOSType.byteValue() > subscription.getRequestedQos().byteValue()) {
            qOSType = subscription.getRequestedQos();
        }
        return qOSType;
    }

    public void processPubRel(Channel channel, PubRelMessage pubRelMessage) {
        this.qos2PublishHandler.processPubRel(channel, pubRelMessage);
    }

    public void processPubRec(Channel channel, PubRecMessage pubRecMessage) {
        String clientID = NettyUtils.clientID(channel);
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        int intValue = pubRecMessage.getMessageID().intValue();
        sessionForClient.moveInFlightToSecondPhaseAckWaiting(intValue);
        LOG.debug("\t\tSRV <--PUBREC-- SUB processPubRec invoked for clientID {} ad messageID {}", clientID, Integer.valueOf(intValue));
        PubRelMessage pubRelMessage = new PubRelMessage();
        pubRelMessage.setMessageID(Integer.valueOf(intValue));
        pubRelMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
        channel.writeAndFlush(pubRelMessage);
    }

    public void processPubComp(Channel channel, PubCompMessage pubCompMessage) {
        String clientID = NettyUtils.clientID(channel);
        int intValue = pubCompMessage.getMessageID().intValue();
        LOG.debug("\t\tSRV <--PUBCOMP-- SUB processPubComp invoked for clientID {} ad messageID {}", clientID, Integer.valueOf(intValue));
        IMessagesStore.StoredMessage secondPhaseAcknowledged = this.m_sessionsStore.sessionForClient(clientID).secondPhaseAcknowledged(intValue);
        String userName = NettyUtils.userName(channel);
        this.m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(secondPhaseAcknowledged, secondPhaseAcknowledged.getTopic(), userName));
    }

    public void processDisconnect(Channel channel) throws InterruptedException {
        channel.flush();
        String clientID = NettyUtils.clientID(channel);
        ConnectionDescriptor connectionDescriptor = this.connectionDescriptors.get(clientID);
        if (connectionDescriptor == null) {
            channel.close();
            return;
        }
        if (connectionDescriptor.channel != channel) {
            channel.close();
            return;
        }
        if (!removeSubscriptions(connectionDescriptor, clientID)) {
            channel.close();
            return;
        }
        if (!dropStoredMessages(connectionDescriptor, clientID)) {
            channel.close();
            return;
        }
        if (!cleanWillMessageAndNotifyInterceptor(connectionDescriptor, clientID)) {
            channel.close();
        } else if (closeChannel(connectionDescriptor) && this.connectionDescriptors.remove(clientID, connectionDescriptor)) {
            LOG.info("DISCONNECT client <{}> finished", clientID);
        }
    }

    private boolean removeSubscriptions(ConnectionDescriptor connectionDescriptor, String str) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.ESTABLISHED, ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED)) {
            return false;
        }
        if (!connectionDescriptor.cleanSession) {
            return true;
        }
        LOG.info("cleaning old saved subscriptions for client <{}>", str);
        this.m_sessionsStore.wipeSubscriptions(str);
        LOG.debug("Wiped subscriptions for client <{}>", str);
        return true;
    }

    private boolean dropStoredMessages(ConnectionDescriptor connectionDescriptor, String str) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED, ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED)) {
            return false;
        }
        if (!connectionDescriptor.cleanSession) {
            return true;
        }
        LOG.debug("Removing messages in session's queue for client <{}>", str);
        this.m_sessionsStore.dropQueue(str);
        LOG.debug("Removed messages in session for client's queue <{}>", str);
        return true;
    }

    private boolean cleanWillMessageAndNotifyInterceptor(ConnectionDescriptor connectionDescriptor, String str) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED, ConnectionDescriptor.ConnectionState.INTERCEPTORS_NOTIFIED)) {
            return false;
        }
        this.m_willStore.remove(str);
        this.m_interceptor.notifyClientDisconnected(str, NettyUtils.userName(connectionDescriptor.channel));
        return true;
    }

    private boolean closeChannel(ConnectionDescriptor connectionDescriptor) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.INTERCEPTORS_NOTIFIED, ConnectionDescriptor.ConnectionState.DISCONNECTED)) {
            return false;
        }
        connectionDescriptor.channel.close();
        return true;
    }

    public void processConnectionLost(String str, Channel channel) {
        this.connectionDescriptors.remove(str, new ConnectionDescriptor(str, channel, true));
        if (this.m_willStore.containsKey(str)) {
            forwardPublishWill(this.m_willStore.get(str), str);
            this.m_willStore.remove(str);
        }
        this.m_interceptor.notifyClientConnectionLost(str, NettyUtils.userName(channel));
    }

    public void processUnsubscribe(Channel channel, UnsubscribeMessage unsubscribeMessage) {
        List<String> list = unsubscribeMessage.topicFilters();
        String clientID = NettyUtils.clientID(channel);
        LOG.debug("UNSUBSCRIBE subscription on topics {} for clientID <{}>", list, clientID);
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        for (String str : list) {
            if (!SubscriptionsStore.validate(str)) {
                channel.close();
                LOG.warn("UNSUBSCRIBE found an invalid topic filter <{}> for clientID <{}>", str, clientID);
                return;
            } else {
                this.subscriptions.removeSubscription(str, clientID);
                sessionForClient.unsubscribeFrom(str);
                this.m_interceptor.notifyTopicUnsubscribed(str, clientID, NettyUtils.userName(channel));
            }
        }
        int intValue = unsubscribeMessage.getMessageID().intValue();
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageID(Integer.valueOf(intValue));
        LOG.info("replying with UnsubAck to MSG ID {}", Integer.valueOf(intValue));
        channel.writeAndFlush(unsubAckMessage);
    }

    public void processSubscribe(Channel channel, SubscribeMessage subscribeMessage) {
        String clientID = NettyUtils.clientID(channel);
        LOG.info("SUBSCRIBE client <{}>", clientID);
        int intValue = subscribeMessage.getMessageID().intValue();
        LOG.debug("SUBSCRIBE client <{}> on server {} packetID {}", new Object[]{clientID, this.m_server_port, Integer.valueOf(intValue)});
        RunningSubscription runningSubscription = new RunningSubscription(clientID, intValue);
        if (this.subscriptionInCourse.putIfAbsent(runningSubscription, SubscriptionState.VERIFIED) != null) {
            LOG.debug("The client <{}> sent another SUBSCRIBE while this one was processing", clientID);
            return;
        }
        String userName = NettyUtils.userName(channel);
        List<SubscribeMessage.Couple> doVerify = doVerify(clientID, userName, subscribeMessage);
        SubAckMessage doAckMessageFromValidateFilters = doAckMessageFromValidateFilters(doVerify);
        if (!this.subscriptionInCourse.replace(runningSubscription, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.debug("The client {} sent another SUBSCRIBE while this one was verifing topicFilters");
            return;
        }
        doAckMessageFromValidateFilters.setMessageID(Integer.valueOf(intValue));
        List<Subscription> doStoreSubscription = doStoreSubscription(doVerify, clientID);
        LOG.debug("SUBACK for packetID {}", Integer.valueOf(intValue));
        if (LOG.isTraceEnabled()) {
            LOG.trace("subscription tree {}", this.subscriptions.dumpTree());
        }
        for (Subscription subscription : doStoreSubscription) {
            LOG.debug("Persisting subscription {}", subscription);
            this.subscriptions.add(subscription.asClientTopicCouple());
        }
        channel.writeAndFlush(doAckMessageFromValidateFilters);
        Iterator<Subscription> it = doStoreSubscription.iterator();
        while (it.hasNext()) {
            publishRetainedMessagesInSession(it.next(), userName);
        }
        if (this.subscriptionInCourse.remove(runningSubscription, SubscriptionState.STORED)) {
            return;
        }
        LOG.warn("Failed to remove the descriptor, something bad happened");
    }

    private List<Subscription> doStoreSubscription(List<SubscribeMessage.Couple> list, String str) {
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(str);
        ArrayList arrayList = new ArrayList();
        for (SubscribeMessage.Couple couple : list) {
            if (couple.qos != AbstractMessage.QOSType.FAILURE.byteValue()) {
                Subscription subscription = new Subscription(str, couple.topicFilter, AbstractMessage.QOSType.valueOf(couple.qos));
                sessionForClient.subscribe(subscription);
                arrayList.add(subscription);
            }
        }
        return arrayList;
    }

    private List<SubscribeMessage.Couple> doVerify(String str, String str2, SubscribeMessage subscribeMessage) {
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(str);
        ArrayList arrayList = new ArrayList();
        for (SubscribeMessage.Couple couple : subscribeMessage.subscriptions()) {
            if (this.m_authorizator.canRead(couple.topicFilter, str2, sessionForClient.clientID)) {
                arrayList.add(new SubscribeMessage.Couple((SubscriptionsStore.validate(couple.topicFilter) ? AbstractMessage.QOSType.valueOf(couple.qos) : AbstractMessage.QOSType.FAILURE).byteValue(), couple.topicFilter));
            } else {
                LOG.debug("topic {} doesn't have read credentials", couple.topicFilter);
                arrayList.add(new SubscribeMessage.Couple(AbstractMessage.QOSType.FAILURE.byteValue(), couple.topicFilter));
            }
        }
        return arrayList;
    }

    private SubAckMessage doAckMessageFromValidateFilters(List<SubscribeMessage.Couple> list) {
        SubAckMessage subAckMessage = new SubAckMessage();
        Iterator<SubscribeMessage.Couple> it = list.iterator();
        while (it.hasNext()) {
            subAckMessage.addType(AbstractMessage.QOSType.valueOf(it.next().qos));
        }
        return subAckMessage;
    }

    private void publishRetainedMessagesInSession(final Subscription subscription, String str) {
        LOG.debug("Publish persisted messages in session {}", subscription);
        Collection<IMessagesStore.StoredMessage> searchMatching = this.m_messagesStore.searchMatching(new IMatchingCondition() { // from class: io.moquette.spi.impl.ProtocolProcessor.1
            @Override // io.moquette.spi.IMatchingCondition
            public boolean match(String str2) {
                return SubscriptionsStore.matchTopics(str2, subscription.getTopicFilter());
            }
        });
        LOG.debug("Found {} messages to republish", Integer.valueOf(searchMatching.size()));
        this.internalRepublisher.publishRetained(this.m_sessionsStore.sessionForClient(subscription.getClientId()), searchMatching);
        this.m_interceptor.notifyTopicSubscribed(subscription, str);
    }

    public void notifyChannelWritable(Channel channel) {
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(NettyUtils.clientID(channel));
        boolean z = false;
        while (channel.isWritable() && !z) {
            IMessagesStore.StoredMessage poll = sessionForClient.queue().poll();
            if (poll == null) {
                z = true;
            } else {
                channel.write(InternalRepublisher.createPublishForQos(poll.getTopic(), poll.getQos(), poll.getMessage(), this.m_messagesStore.getMessageByGuid(poll.getGuid()) != null));
            }
        }
        channel.flush();
    }

    public boolean addInterceptHandler(InterceptHandler interceptHandler) {
        return this.m_interceptor.addInterceptHandler(interceptHandler);
    }

    public boolean removeInterceptHandler(InterceptHandler interceptHandler) {
        return this.m_interceptor.removeInterceptHandler(interceptHandler);
    }
}
