/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.tcp.reactor;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadFactory;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.PassThroughPromiseToListenableFutureAdapter;
import org.springframework.messaging.tcp.reactor.Reactor2TcpConnection;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.Environment;
import reactor.core.config.ConfigurationReader;
import reactor.core.config.ReactorConfiguration;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Reconnect;
import reactor.io.net.Spec;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.io.net.tcp.TcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.action.Signal;

public class Reactor2TcpClient<P>
implements TcpOperations<P> {
    public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
    private static final Method eventLoopGroupMethod = Reactor2TcpClient.initEventLoopGroupMethod();
    private final EventLoopGroup eventLoopGroup;
    private final Environment environment;
    private final NetStreams.TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
    private final List<TcpClient<Message<P>, Message<P>>> tcpClients = new ArrayList<TcpClient<Message<P>, Message<P>>>();
    private boolean stopping;

    public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
        final NioEventLoopGroup nioEventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
        this.eventLoopGroup = nioEventLoopGroup;
        this.environment = new Environment((ConfigurationReader)new SynchronousDispatcherConfigReader());
        this.tcpClientSpecFactory = new NetStreams.TcpClientFactory<Message<P>, Message<P>>(){

            public Spec.TcpClientSpec<Message<P>, Message<P>> apply(Spec.TcpClientSpec<Message<P>, Message<P>> spec) {
                return ((Spec.TcpClientSpec)spec.env(Reactor2TcpClient.this.environment)).codec(codec).connect(host, port).options(this.createClientSocketOptions());
            }

            private ClientSocketOptions createClientSocketOptions() {
                return (ClientSocketOptions)ReflectionUtils.invokeMethod((Method)eventLoopGroupMethod, (Object)new NettyClientSocketOptions(), (Object[])new Object[]{nioEventLoopGroup});
            }
        };
    }

    public Reactor2TcpClient(NetStreams.TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
        Assert.notNull(tcpClientSpecFactory, (String)"'tcpClientClientFactory' must not be null");
        this.tcpClientSpecFactory = tcpClientSpecFactory;
        this.eventLoopGroup = null;
        this.environment = null;
    }

    private static NioEventLoopGroup initEventLoopGroup() {
        int ioThreadCount;
        try {
            ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
        }
        catch (Throwable ex) {
            ioThreadCount = -1;
        }
        if (ioThreadCount <= 0) {
            ioThreadCount = Runtime.getRuntime().availableProcessors();
        }
        return new NioEventLoopGroup(ioThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-tcp-io"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
        Runnable cleanupTask;
        TcpClient tcpClient;
        Assert.notNull(connectionHandler, (String)"TcpConnectionHandler must not be null");
        List<TcpClient<Message<P>, Message<P>>> list = this.tcpClients;
        synchronized (list) {
            if (this.stopping) {
                IllegalStateException ex = new IllegalStateException("Shutting down.");
                connectionHandler.afterConnectFailure(ex);
                return new PassThroughPromiseToListenableFutureAdapter<Void>((Promise<Void>)Promises.error((Throwable)ex));
            }
            tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
            this.tcpClients.add(tcpClient);
            cleanupTask = new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    List list = Reactor2TcpClient.this.tcpClients;
                    synchronized (list) {
                        Reactor2TcpClient.this.tcpClients.remove(tcpClient);
                    }
                }
            };
        }
        Promise promise = tcpClient.start(new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask));
        return new PassThroughPromiseToListenableFutureAdapter<Void>((Promise<Void>)promise.onError((Consumer)new Consumer<Throwable>(){

            public void accept(Throwable ex) {
                cleanupTask.run();
                connectionHandler.afterConnectFailure(ex);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
        Runnable cleanupTask;
        TcpClient tcpClient;
        Assert.notNull(connectionHandler, (String)"TcpConnectionHandler must not be null");
        Assert.notNull((Object)strategy, (String)"ReconnectStrategy must not be null");
        List<TcpClient<Message<P>, Message<P>>> list = this.tcpClients;
        synchronized (list) {
            if (this.stopping) {
                IllegalStateException ex = new IllegalStateException("Shutting down.");
                connectionHandler.afterConnectFailure(ex);
                return new PassThroughPromiseToListenableFutureAdapter<Void>((Promise<Void>)Promises.error((Throwable)ex));
            }
            tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
            this.tcpClients.add(tcpClient);
            cleanupTask = new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    List list = Reactor2TcpClient.this.tcpClients;
                    synchronized (list) {
                        Reactor2TcpClient.this.tcpClients.remove(tcpClient);
                    }
                }
            };
        }
        Stream stream = tcpClient.start(new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask), (Reconnect)new ReactorReconnectAdapter(strategy));
        return new PassThroughPromiseToListenableFutureAdapter<Void>((Promise<Void>)stream.next().after());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<Void> shutdown() {
        List<TcpClient<Message<P>, Message<P>>> list = this.tcpClients;
        synchronized (list) {
            this.stopping = true;
        }
        Promise promise = Streams.from(this.tcpClients).flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Void>>(){

            public Promise<Void> apply(final TcpClient<Message<P>, Message<P>> client) {
                return client.shutdown().onComplete((Consumer)new Consumer<Promise<Void>>(){

                    public void accept(Promise<Void> voidPromise) {
                        Reactor2TcpClient.this.tcpClients.remove(client);
                    }
                });
            }
        }).next();
        if (this.eventLoopGroup != null) {
            final Promise eventLoopPromise = Promises.prepare();
            promise.onComplete((Consumer)new Consumer<Promise<Void>>(){

                public void accept(Promise<Void> voidPromise) {
                    Reactor2TcpClient.this.eventLoopGroup.shutdownGracefully().addListener((GenericFutureListener)new FutureListener<Object>(){

                        public void operationComplete(Future<Object> future) throws Exception {
                            if (future.isSuccess()) {
                                eventLoopPromise.onComplete();
                            } else {
                                eventLoopPromise.onError(future.cause());
                            }
                        }
                    });
                }
            });
            promise = eventLoopPromise;
        }
        if (this.environment != null) {
            promise.onComplete((Consumer)new Consumer<Promise<Void>>(){

                public void accept(Promise<Void> voidPromise) {
                    Reactor2TcpClient.this.environment.shutdown();
                }
            });
        }
        return new PassThroughPromiseToListenableFutureAdapter<Void>((Promise<Void>)promise);
    }

    private static Method initEventLoopGroupMethod() {
        for (Method method : NettyClientSocketOptions.class.getMethods()) {
            if (!method.getName().equals("eventLoopGroup") || method.getParameterCount() != 1) continue;
            return method;
        }
        throw new IllegalStateException("No compatible Reactor version found");
    }

    private static class ReactorReconnectAdapter
    implements Reconnect {
        private final ReconnectStrategy strategy;

        public ReactorReconnectAdapter(ReconnectStrategy strategy) {
            this.strategy = strategy;
        }

        public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
            return Tuple.of((Object)address, (Object)this.strategy.getTimeToNextAttempt(attempt));
        }
    }

    private static class MessageChannelStreamHandler<P>
    implements ReactorChannelHandler<Message<P>, Message<P>, ChannelStream<Message<P>, Message<P>>> {
        private final TcpConnectionHandler<P> connectionHandler;
        private final Runnable cleanupTask;

        public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) {
            this.connectionHandler = connectionHandler;
            this.cleanupTask = cleanupTask;
        }

        public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
            Promise closePromise = Promises.prepare();
            this.connectionHandler.afterConnected(new Reactor2TcpConnection<P>(channelStream, (Promise<Void>)closePromise));
            channelStream.finallyDo(new Consumer<Signal<Message<P>>>(){

                public void accept(Signal<Message<P>> signal) {
                    cleanupTask.run();
                    if (signal.isOnError()) {
                        connectionHandler.handleFailure(signal.getThrowable());
                    } else if (signal.isOnComplete()) {
                        connectionHandler.afterConnectionClosed();
                    }
                }
            }).consume(new Consumer<Message<P>>(){

                public void accept(Message<P> message) {
                    connectionHandler.handleMessage(message);
                }
            });
            return closePromise;
        }
    }

    private static class SynchronousDispatcherConfigReader
    implements ConfigurationReader {
        private SynchronousDispatcherConfigReader() {
        }

        public ReactorConfiguration read() {
            return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
        }
    }
}

