/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query.netty;

import akka.dispatch.Futures;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.KvStateClientHandler;
import org.apache.flink.runtime.query.netty.KvStateClientHandlerCallback;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.Preconditions;
import scala.concurrent.Future;
import scala.concurrent.Promise;

public class KvStateClient {
    private final Bootstrap bootstrap;
    private final KvStateRequestStats stats;
    private final ConcurrentHashMap<KvStateServerAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap();
    private final ConcurrentHashMap<KvStateServerAddress, PendingConnection> pendingConnections = new ConcurrentHashMap();
    private final AtomicBoolean shutDown = new AtomicBoolean();

    public KvStateClient(int numEventLoopThreads, KvStateRequestStats stats) {
        Preconditions.checkArgument((numEventLoopThreads >= 1 ? 1 : 0) != 0, (Object)"Non-positive number of event loop threads.");
        NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink KvStateClient Event Loop Thread %d").build();
        NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)nioGroup)).channel(NioSocketChannel.class)).option(ChannelOption.ALLOCATOR, (Object)bufferPool)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()});
            }
        });
        this.stats = (KvStateRequestStats)Preconditions.checkNotNull((Object)stats, (String)"Statistics tracker");
    }

    public Future<byte[]> getKvState(KvStateServerAddress serverAddress, KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
        if (this.shutDown.get()) {
            return Futures.failed((Throwable)new IllegalStateException("Shut down"));
        }
        EstablishedConnection connection = this.establishedConnections.get(serverAddress);
        if (connection != null) {
            return connection.getKvState(kvStateId, serializedKeyAndNamespace);
        }
        PendingConnection pendingConnection = this.pendingConnections.get(serverAddress);
        if (pendingConnection != null) {
            return pendingConnection.getKvState(kvStateId, serializedKeyAndNamespace);
        }
        PendingConnection pending = new PendingConnection(serverAddress);
        PendingConnection previous = this.pendingConnections.putIfAbsent(serverAddress, pending);
        if (previous == null) {
            this.bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()).addListener((GenericFutureListener)pending);
            return pending.getKvState(kvStateId, serializedKeyAndNamespace);
        }
        return previous.getKvState(kvStateId, serializedKeyAndNamespace);
    }

    public void shutDown() {
        if (this.shutDown.compareAndSet(false, true)) {
            EventLoopGroup group;
            for (Map.Entry<KvStateServerAddress, EstablishedConnection> entry : this.establishedConnections.entrySet()) {
                if (!this.establishedConnections.remove(entry.getKey(), entry.getValue())) continue;
                entry.getValue().close();
            }
            for (Map.Entry<KvStateServerAddress, Object> entry : this.pendingConnections.entrySet()) {
                if (this.pendingConnections.remove(entry.getKey()) == null) continue;
                ((PendingConnection)entry.getValue()).close();
            }
            if (this.bootstrap != null && (group = this.bootstrap.group()) != null) {
                group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
            }
        }
    }

    public void closeConnection(KvStateServerAddress serverAddress) {
        EstablishedConnection established;
        PendingConnection pending = this.pendingConnections.get(serverAddress);
        if (pending != null) {
            pending.close();
        }
        if ((established = this.establishedConnections.remove(serverAddress)) != null) {
            established.close();
        }
    }

    private class EstablishedConnection
    implements KvStateClientHandlerCallback {
        private final KvStateServerAddress serverAddress;
        private final Channel channel;
        private final ConcurrentHashMap<Long, PromiseAndTimestamp> pendingRequests = new ConcurrentHashMap();
        private final AtomicLong requestCount = new AtomicLong();
        private final AtomicReference<Throwable> failureCause = new AtomicReference();

        EstablishedConnection(KvStateServerAddress serverAddress, Channel channel) {
            this.serverAddress = (KvStateServerAddress)Preconditions.checkNotNull((Object)serverAddress, (String)"KvStateServerAddress");
            this.channel = (Channel)Preconditions.checkNotNull((Object)channel, (String)"Channel");
            channel.pipeline().addLast("KvStateClientHandler", (ChannelHandler)new KvStateClientHandler(this));
            KvStateClient.this.stats.reportActiveConnection();
        }

        void close() {
            this.close(new ClosedChannelException());
        }

        private boolean close(Throwable cause) {
            if (this.failureCause.compareAndSet(null, cause)) {
                this.channel.close();
                KvStateClient.this.stats.reportInactiveConnection();
                Iterator iterator = ((ConcurrentHashMap.KeySetView)this.pendingRequests.keySet()).iterator();
                while (iterator.hasNext()) {
                    long requestId = (Long)iterator.next();
                    PromiseAndTimestamp pending = this.pendingRequests.remove(requestId);
                    if (pending == null || !pending.promise.tryFailure(cause)) continue;
                    KvStateClient.this.stats.reportFailedRequest();
                }
                return true;
            }
            return false;
        }

        Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
            PromiseAndTimestamp requestPromiseTs = new PromiseAndTimestamp((Promise<byte[]>)Futures.promise(), System.nanoTime());
            try {
                PromiseAndTimestamp p;
                final long requestId = this.requestCount.getAndIncrement();
                this.pendingRequests.put(requestId, requestPromiseTs);
                KvStateClient.this.stats.reportRequest();
                ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest(this.channel.alloc(), requestId, kvStateId, serializedKeyAndNamespace);
                this.channel.writeAndFlush((Object)buf).addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        PromiseAndTimestamp pending;
                        if (!future.isSuccess() && (pending = (PromiseAndTimestamp)EstablishedConnection.this.pendingRequests.remove(requestId)) != null && pending.promise.tryFailure(future.cause())) {
                            KvStateClient.this.stats.reportFailedRequest();
                        }
                    }
                });
                Throwable failure = this.failureCause.get();
                if (failure != null && (p = this.pendingRequests.remove(requestId)) != null && p.promise.tryFailure(failure)) {
                    KvStateClient.this.stats.reportFailedRequest();
                }
            }
            catch (Throwable t) {
                requestPromiseTs.promise.tryFailure(t);
            }
            return requestPromiseTs.promise.future();
        }

        @Override
        public void onRequestResult(long requestId, byte[] serializedValue) {
            PromiseAndTimestamp pending = this.pendingRequests.remove(requestId);
            if (pending != null && pending.promise.trySuccess((Object)serializedValue)) {
                long durationMillis = (System.nanoTime() - pending.timestamp) / 1000000L;
                KvStateClient.this.stats.reportSuccessfulRequest(durationMillis);
            }
        }

        @Override
        public void onRequestFailure(long requestId, Throwable cause) {
            PromiseAndTimestamp pending = this.pendingRequests.remove(requestId);
            if (pending != null && pending.promise.tryFailure(cause)) {
                KvStateClient.this.stats.reportFailedRequest();
            }
        }

        @Override
        public void onFailure(Throwable cause) {
            if (this.close(cause)) {
                KvStateClient.this.establishedConnections.remove(this.serverAddress, this);
            }
        }

        public String toString() {
            return "EstablishedConnection{serverAddress=" + this.serverAddress + ", channel=" + this.channel + ", pendingRequests=" + this.pendingRequests.size() + ", requestCount=" + this.requestCount + ", failureCause=" + this.failureCause + '}';
        }

        private class PromiseAndTimestamp {
            private final Promise<byte[]> promise;
            private final long timestamp;

            public PromiseAndTimestamp(Promise<byte[]> promise, long timestamp) {
                this.promise = promise;
                this.timestamp = timestamp;
            }
        }
    }

    private class PendingConnection
    implements ChannelFutureListener {
        private final Object connectLock = new Object();
        private final KvStateServerAddress serverAddress;
        private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque();
        private EstablishedConnection established;
        private boolean closed;
        private Throwable failureCause;

        private PendingConnection(KvStateServerAddress serverAddress) {
            this.serverAddress = serverAddress;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                this.handInChannel(future.channel());
            } else {
                this.close(future.cause());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
            Object object = this.connectLock;
            synchronized (object) {
                if (this.failureCause != null) {
                    return Futures.failed((Throwable)this.failureCause);
                }
                if (this.closed) {
                    return Futures.failed((Throwable)new ClosedChannelException());
                }
                if (this.established != null) {
                    return this.established.getKvState(kvStateId, serializedKeyAndNamespace);
                }
                PendingRequest pending = new PendingRequest(kvStateId, serializedKeyAndNamespace);
                this.queuedRequests.add(pending);
                return pending.promise.future();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handInChannel(Channel channel) {
            Object object = this.connectLock;
            synchronized (object) {
                if (this.closed || this.failureCause != null) {
                    channel.close();
                } else {
                    PendingRequest pending;
                    this.established = new EstablishedConnection(this.serverAddress, channel);
                    while ((pending = this.queuedRequests.poll()) != null) {
                        Future<byte[]> resultFuture = this.established.getKvState(pending.kvStateId, pending.serializedKeyAndNamespace);
                        pending.promise.completeWith(resultFuture);
                    }
                    KvStateClient.this.establishedConnections.put(this.serverAddress, this.established);
                    KvStateClient.this.pendingConnections.remove(this.serverAddress);
                    if (KvStateClient.this.shutDown.get() && KvStateClient.this.establishedConnections.remove(this.serverAddress, this.established)) {
                        this.established.close();
                    }
                }
            }
        }

        private void close() {
            this.close(new ClosedChannelException());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void close(Throwable cause) {
            Object object = this.connectLock;
            synchronized (object) {
                if (!this.closed) {
                    if (this.failureCause == null) {
                        this.failureCause = cause;
                    }
                    if (this.established != null) {
                        this.established.close();
                    } else {
                        PendingRequest pending;
                        while ((pending = this.queuedRequests.poll()) != null) {
                            pending.promise.tryFailure(cause);
                        }
                    }
                    this.closed = true;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String toString() {
            Object object = this.connectLock;
            synchronized (object) {
                return "PendingConnection{serverAddress=" + this.serverAddress + ", queuedRequests=" + this.queuedRequests.size() + ", established=" + (this.established != null) + ", closed=" + this.closed + '}';
            }
        }

        private final class PendingRequest {
            private final KvStateID kvStateId;
            private final byte[] serializedKeyAndNamespace;
            private final Promise<byte[]> promise;

            private PendingRequest(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
                this.kvStateId = kvStateId;
                this.serializedKeyAndNamespace = serializedKeyAndNamespace;
                this.promise = Futures.promise();
            }
        }
    }
}

