/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.test.rule;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ServerSocketFactory;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.NotRunning;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.rules.ExternalResource;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaRule;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import scala.Option;
import scala.collection.Seq;

public class KafkaEmbedded
extends ExternalResource
implements KafkaRule,
InitializingBean,
DisposableBean {
    private static final Log logger = LogFactory.getLog(KafkaEmbedded.class);
    public static final String BEAN_NAME = "kafkaEmbedded";
    public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
    public static final long METADATA_PROPAGATION_TIMEOUT = 10000L;
    private final int count;
    private final boolean controlledShutdown;
    private final String[] topics;
    private final int partitionsPerTopic;
    private final List<KafkaServer> kafkaServers = new ArrayList<KafkaServer>();
    private EmbeddedZookeeper zookeeper;
    private ZkClient zookeeperClient;
    private String zkConnect;
    private Map<String, String> brokerProperties;
    private int[] kafkaPorts;

    public KafkaEmbedded(int count) {
        this(count, false, new String[0]);
    }

    public KafkaEmbedded(int count, boolean controlledShutdown, String ... topics) {
        this(count, controlledShutdown, 2, topics);
    }

    public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, String ... topics) {
        this.count = count;
        this.controlledShutdown = controlledShutdown;
        this.topics = topics != null ? topics : new String[0];
        this.partitionsPerTopic = partitions;
    }

    public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
        this.brokerProperties = brokerProperties;
        return this;
    }

    public void setKafkaPorts(int ... kafkaPorts) {
        this.kafkaPorts = kafkaPorts;
    }

    public void afterPropertiesSet() throws Exception {
        this.before();
    }

    public void before() throws Exception {
        this.startZookeeper();
        int zkConnectionTimeout = 6000;
        int zkSessionTimeout = 6000;
        this.zkConnect = "127.0.0.1:" + this.zookeeper.port();
        this.zookeeperClient = new ZkClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout, (ZkSerializer)ZKStringSerializer$.MODULE$);
        this.kafkaServers.clear();
        for (int i = 0; i < this.count; ++i) {
            Integer port;
            Integer n = port = this.kafkaPorts != null && this.kafkaPorts.length > i ? Integer.valueOf(this.kafkaPorts[i]) : null;
            if (port == null) {
                ServerSocket ss = ServerSocketFactory.getDefault().createServerSocket(0);
                port = ss.getLocalPort();
                ss.close();
            }
            Properties brokerConfigProperties = TestUtils.createBrokerConfig((int)i, (String)this.zkConnect, (boolean)this.controlledShutdown, (boolean)true, (int)port, (Option)Option.apply(null), (Option)Option.apply(null), (Option)Option.apply(null), (boolean)true, (boolean)false, (int)0, (boolean)false, (int)0, (boolean)false, (int)0, (Option)Option.apply(null), (int)1);
            brokerConfigProperties.setProperty(KafkaConfig$.MODULE$.PortProp(), "" + port);
            brokerConfigProperties.setProperty("replica.socket.timeout.ms", "1000");
            brokerConfigProperties.setProperty("controller.socket.timeout.ms", "1000");
            brokerConfigProperties.setProperty("offsets.topic.replication.factor", "1");
            if (this.brokerProperties != null) {
                this.brokerProperties.forEach(brokerConfigProperties::setProperty);
            }
            KafkaServer server = TestUtils.createServer((KafkaConfig)new KafkaConfig((Map)brokerConfigProperties), (Time)Time.SYSTEM);
            this.kafkaServers.add(server);
        }
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getBrokersAsString());
        AdminClient admin = AdminClient.create(adminConfigs);
        List newTopics = Arrays.stream(this.topics).map(t -> new NewTopic(t, this.partitionsPerTopic, (short)this.count)).collect(Collectors.toList());
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        createTopics.all().get();
        admin.close();
        System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, this.getBrokersAsString());
        System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, this.getZookeeperConnectionString());
    }

    public void destroy() throws Exception {
        this.after();
    }

    public void after() {
        System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS);
        System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
        for (KafkaServer kafkaServer : this.kafkaServers) {
            try {
                if (kafkaServer.brokerState().currentState() != NotRunning.state()) {
                    kafkaServer.shutdown();
                    kafkaServer.awaitShutdown();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                CoreUtils.delete((Seq)kafkaServer.config().logDirs());
            }
            catch (Exception exception) {}
        }
        try {
            this.zookeeperClient.close();
        }
        catch (ZkInterruptedException zkInterruptedException) {
            // empty catch block
        }
        try {
            this.zookeeper.shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public List<KafkaServer> getKafkaServers() {
        return this.kafkaServers;
    }

    public KafkaServer getKafkaServer(int id) {
        return this.kafkaServers.get(id);
    }

    public EmbeddedZookeeper getZookeeper() {
        return this.zookeeper;
    }

    @Override
    public ZkClient getZkClient() {
        return this.zookeeperClient;
    }

    @Override
    public String getZookeeperConnectionString() {
        return this.zkConnect;
    }

    public BrokerAddress getBrokerAddress(int i) {
        KafkaServer kafkaServer = this.kafkaServers.get(i);
        return new BrokerAddress("127.0.0.1", kafkaServer.config().port());
    }

    @Override
    public BrokerAddress[] getBrokerAddresses() {
        ArrayList<BrokerAddress> addresses = new ArrayList<BrokerAddress>();
        for (KafkaServer kafkaServer : this.kafkaServers) {
            addresses.add(new BrokerAddress("127.0.0.1", kafkaServer.config().port()));
        }
        return addresses.toArray(new BrokerAddress[addresses.size()]);
    }

    @Override
    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    public void bounce(BrokerAddress brokerAddress) {
        for (KafkaServer kafkaServer : this.getKafkaServers()) {
            if (!brokerAddress.equals(new BrokerAddress(kafkaServer.config().hostName(), kafkaServer.config().port()))) continue;
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
        }
    }

    public void startZookeeper() {
        this.zookeeper = new EmbeddedZookeeper();
    }

    @Deprecated
    public void bounce(int index, boolean waitForPropagation) {
        throw new UnsupportedOperationException();
    }

    @Deprecated
    public void bounce(int index) {
        this.bounce(index, true);
    }

    public void restart(final int index) throws Exception {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(10, Collections.singletonMap(Exception.class, true));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100L);
        backOffPolicy.setMaxInterval(1000L);
        backOffPolicy.setMultiplier(2.0);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
        retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
        retryTemplate.execute((RetryCallback)new RetryCallback<Void, Exception>(){

            public Void doWithRetry(RetryContext context) throws Exception {
                ((KafkaServer)KafkaEmbedded.this.kafkaServers.get(index)).startup();
                return null;
            }
        });
    }

    @Deprecated
    public void waitUntilSynced(String topic, int brokerId) {
        throw new UnsupportedOperationException();
    }

    @Override
    public String getBrokersAsString() {
        StringBuilder builder = new StringBuilder();
        for (BrokerAddress brokerAddress : this.getBrokerAddresses()) {
            builder.append(brokerAddress.toString()).append(',');
        }
        return builder.substring(0, builder.length() - 1);
    }

    @Override
    public boolean isEmbedded() {
        return true;
    }

    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
        final CountDownLatch consumerLatch = new CountDownLatch(1);
        consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                consumerLatch.countDown();
            }
        });
        consumer.poll(0L);
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)consumerLatch.await(30L, TimeUnit.SECONDS)).as("Failed to be assigned partitions from the embedded topics", new Object[0])).isTrue();
    }

    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) throws Exception {
        this.consumeFromEmbeddedTopics(consumer, topic);
    }

    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String ... topics) throws Exception {
        for (String topic : topics) {
            Assertions.assertThat((Object[])this.topics).as("topic '" + topic + "' is not in embedded topic list", new Object[0]).contains((Object[])new String[]{topic});
        }
        consumer.subscribe(Arrays.asList(topics), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("partitions assigned: " + partitions));
                }
            }
        });
        logger.debug((Object)"Subscription Initiated");
    }
}

