/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.store.ha;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.ha.FlowMonitor;
import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnectionState;

public class DefaultHAClient
extends ServiceThread
implements HAClient {
    private static final Logger log = LoggerFactory.getLogger((String)"RocketmqStore");
    private static final int READ_MAX_BUFFER_SIZE = 0x400000;
    private final AtomicReference<String> masterHaAddress = new AtomicReference();
    private final AtomicReference<String> masterAddress = new AtomicReference();
    private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
    private SocketChannel socketChannel;
    private Selector selector;
    private long lastReadTimestamp = System.currentTimeMillis();
    private long lastWriteTimestamp = System.currentTimeMillis();
    private long currentReportedOffset = 0L;
    private int dispatchPosition = 0;
    private ByteBuffer byteBufferRead = ByteBuffer.allocate(0x400000);
    private ByteBuffer byteBufferBackup = ByteBuffer.allocate(0x400000);
    private DefaultMessageStore defaultMessageStore;
    private volatile HAConnectionState currentState = HAConnectionState.READY;
    private FlowMonitor flowMonitor;

    public DefaultHAClient(DefaultMessageStore defaultMessageStore) throws IOException {
        this.selector = NetworkUtil.openSelector();
        this.defaultMessageStore = defaultMessageStore;
        this.flowMonitor = new FlowMonitor(defaultMessageStore.getMessageStoreConfig());
    }

    @Override
    public void updateHaMasterAddress(String newAddr) {
        String currentAddr = this.masterHaAddress.get();
        if (this.masterHaAddress.compareAndSet(currentAddr, newAddr)) {
            log.info("update master ha address, OLD: " + currentAddr + " NEW: " + newAddr);
        }
    }

    @Override
    public void updateMasterAddress(String newAddr) {
        String currentAddr = this.masterAddress.get();
        if (this.masterAddress.compareAndSet(currentAddr, newAddr)) {
            log.info("update master address, OLD: " + currentAddr + " NEW: " + newAddr);
        }
    }

    @Override
    public String getHaMasterAddress() {
        return this.masterHaAddress.get();
    }

    @Override
    public String getMasterAddress() {
        return this.masterAddress.get();
    }

    private boolean isTimeToReportOffset() {
        long interval = this.defaultMessageStore.now() - this.lastWriteTimestamp;
        return interval > (long)this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
    }

    private boolean reportSlaveMaxOffset(long maxOffset) {
        this.reportOffset.position(0);
        this.reportOffset.limit(8);
        this.reportOffset.putLong(maxOffset);
        this.reportOffset.position(0);
        this.reportOffset.limit(8);
        for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); ++i) {
            try {
                this.socketChannel.write(this.reportOffset);
                continue;
            }
            catch (IOException e) {
                log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", (Throwable)e);
                return false;
            }
        }
        this.lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();
        return !this.reportOffset.hasRemaining();
    }

    private void reallocateByteBuffer() {
        int remain = 0x400000 - this.dispatchPosition;
        if (remain > 0) {
            this.byteBufferRead.position(this.dispatchPosition);
            this.byteBufferBackup.position(0);
            this.byteBufferBackup.limit(0x400000);
            this.byteBufferBackup.put(this.byteBufferRead);
        }
        this.swapByteBuffer();
        this.byteBufferRead.position(remain);
        this.byteBufferRead.limit(0x400000);
        this.dispatchPosition = 0;
    }

    private void swapByteBuffer() {
        ByteBuffer tmp = this.byteBufferRead;
        this.byteBufferRead = this.byteBufferBackup;
        this.byteBufferBackup = tmp;
    }

    private boolean processReadEvent() {
        int readSizeZeroTimes = 0;
        while (this.byteBufferRead.hasRemaining()) {
            try {
                int readSize = this.socketChannel.read(this.byteBufferRead);
                if (readSize > 0) {
                    this.flowMonitor.addByteCountTransferred(readSize);
                    readSizeZeroTimes = 0;
                    boolean result = this.dispatchReadRequest();
                    if (!result) {
                        log.error("HAClient, dispatchReadRequest error");
                        return false;
                    }
                    this.lastReadTimestamp = System.currentTimeMillis();
                    continue;
                }
                if (readSize == 0) {
                    if (++readSizeZeroTimes < 3) continue;
                    break;
                }
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
            catch (IOException e) {
                log.info("HAClient, processReadEvent read socket exception", (Throwable)e);
                return false;
            }
        }
        return true;
    }

    private boolean dispatchReadRequest() {
        block2: {
            int diff;
            int msgHeaderSize = 12;
            int readSocketPos = this.byteBufferRead.position();
            while ((diff = this.byteBufferRead.position() - this.dispatchPosition) >= 12) {
                long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                long slavePhyOffset = this.defaultMessageStore.getMaxPhyOffset();
                if (slavePhyOffset != 0L && slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
                if (diff < 12 + bodySize) break;
                byte[] bodyData = this.byteBufferRead.array();
                int dataStart = this.dispatchPosition + 12;
                this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData, dataStart, bodySize);
                this.byteBufferRead.position(readSocketPos);
                this.dispatchPosition += 12 + bodySize;
                if (this.reportSlaveMaxOffsetPlus()) continue;
                return false;
            }
            if (this.byteBufferRead.hasRemaining()) break block2;
            this.reallocateByteBuffer();
        }
        return true;
    }

    private boolean reportSlaveMaxOffsetPlus() {
        boolean result = true;
        long currentPhyOffset = this.defaultMessageStore.getMaxPhyOffset();
        if (currentPhyOffset > this.currentReportedOffset) {
            this.currentReportedOffset = currentPhyOffset;
            result = this.reportSlaveMaxOffset(this.currentReportedOffset);
            if (!result) {
                this.closeMaster();
                log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
            }
        }
        return result;
    }

    @Override
    public void changeCurrentState(HAConnectionState currentState) {
        log.info("change state to {}", (Object)currentState);
        this.currentState = currentState;
    }

    public boolean connectMaster() throws ClosedChannelException {
        if (null == this.socketChannel) {
            String addr = this.masterHaAddress.get();
            if (addr != null) {
                SocketAddress socketAddress = NetworkUtil.string2SocketAddress((String)addr);
                this.socketChannel = RemotingHelper.connect((SocketAddress)socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, 1);
                    log.info("HAClient connect to master {}", (Object)addr);
                    this.changeCurrentState(HAConnectionState.TRANSFER);
                }
            }
            this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();
            this.lastReadTimestamp = System.currentTimeMillis();
        }
        return this.socketChannel != null;
    }

    @Override
    public void closeMaster() {
        if (null != this.socketChannel) {
            try {
                SelectionKey sk = this.socketChannel.keyFor(this.selector);
                if (sk != null) {
                    sk.cancel();
                }
                this.socketChannel.close();
                this.socketChannel = null;
                log.info("HAClient close connection with master {}", (Object)this.masterHaAddress.get());
                this.changeCurrentState(HAConnectionState.READY);
            }
            catch (IOException e) {
                log.warn("closeMaster exception. ", (Throwable)e);
            }
            this.lastReadTimestamp = 0L;
            this.dispatchPosition = 0;
            this.byteBufferBackup.position(0);
            this.byteBufferBackup.limit(0x400000);
            this.byteBufferRead.position(0);
            this.byteBufferRead.limit(0x400000);
        }
    }

    public void run() {
        log.info(this.getServiceName() + " service started");
        this.flowMonitor.start();
        block7: while (!this.isStopped()) {
            try {
                switch (this.currentState) {
                    case SHUTDOWN: {
                        return;
                    }
                    case READY: {
                        if (this.connectMaster()) continue block7;
                        log.warn("HAClient connect to master {} failed", (Object)this.masterHaAddress.get());
                        this.waitForRunning(5000L);
                        continue block7;
                    }
                    case TRANSFER: {
                        if (this.transferFromMaster()) break;
                        this.closeMasterAndWait();
                        continue block7;
                    }
                    default: {
                        this.waitForRunning(2000L);
                        continue block7;
                    }
                }
                long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;
                if (interval <= (long)this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) continue;
                log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress + "] expired, " + interval);
                this.closeMaster();
                log.warn("AutoRecoverHAClient, master not response some time, so close connection");
            }
            catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                this.closeMasterAndWait();
            }
        }
        log.info(this.getServiceName() + " service end");
    }

    private boolean transferFromMaster() throws IOException {
        boolean result;
        if (this.isTimeToReportOffset()) {
            log.info("Slave report current offset {}", (Object)this.currentReportedOffset);
            result = this.reportSlaveMaxOffset(this.currentReportedOffset);
            if (!result) {
                return false;
            }
        }
        this.selector.select(1000L);
        result = this.processReadEvent();
        if (!result) {
            return false;
        }
        return this.reportSlaveMaxOffsetPlus();
    }

    public void closeMasterAndWait() {
        this.closeMaster();
        this.waitForRunning(5000L);
    }

    @Override
    public long getLastWriteTimestamp() {
        return this.lastWriteTimestamp;
    }

    @Override
    public long getLastReadTimestamp() {
        return this.lastReadTimestamp;
    }

    @Override
    public HAConnectionState getCurrentState() {
        return this.currentState;
    }

    @Override
    public long getTransferredByteInSecond() {
        return this.flowMonitor.getTransferredByteInSecond();
    }

    @Override
    public void shutdown() {
        this.changeCurrentState(HAConnectionState.SHUTDOWN);
        this.flowMonitor.shutdown();
        super.shutdown();
        this.closeMaster();
        try {
            this.selector.close();
        }
        catch (IOException e) {
            log.warn("Close the selector of AutoRecoverHAClient error, ", (Throwable)e);
        }
    }

    public String getServiceName() {
        if (this.defaultMessageStore != null && this.defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
            return this.defaultMessageStore.getBrokerIdentity().getIdentifier() + DefaultHAClient.class.getSimpleName();
        }
        return DefaultHAClient.class.getSimpleName();
    }
}

