public class KafkaJunitRule
extends org.junit.rules.ExternalResource
| Constructor and Description |
|---|
KafkaJunitRule() |
KafkaJunitRule(int kafkaPort) |
KafkaJunitRule(int kafkaPort,
int zookeeperPort) |
KafkaJunitRule(int kafkaPort,
int zookeeperPort,
Properties brokerProperties) |
| Modifier and Type | Method and Description |
|---|---|
protected void |
after() |
protected void |
before() |
Properties |
consumerConfig()
Create a consumer configuration.
|
Properties |
consumerConfig(boolean enableAutoCommit)
Create a consumer configuration.
|
<K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> |
createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Create a Kafka consumer using
consumerConfig() |
<K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> |
createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer,
boolean autoCommitEnabled)
Create a Kafka consumer using
consumerConfig(boolean) |
<K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> |
createProducer(org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Create a Kafka producer using
producerConfig() |
org.apache.kafka.clients.consumer.KafkaConsumer<String,String> |
createStringConsumer()
Create a Kafka consumer that reads messages with String key and values.
|
org.apache.kafka.clients.consumer.KafkaConsumer<String,String> |
createStringConsumer(boolean autoCommitEnabled)
Create a Kafka consumer that reads messages with String key and values.
|
org.apache.kafka.clients.producer.KafkaProducer<String,String> |
createStringProducer()
Create a Kafka producer that produces messages with String keys and values
|
int |
kafkaBrokerPort()
Get the kafka broker port
|
Path |
kafkaLogDir()
Get the Kafka log directory
|
<K,V> com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> |
pollMessages(String topic,
int numMessagesToPoll,
org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
Poll the specified topic for messages
|
com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<String,String>>> |
pollStringMessages(String topic,
int numMessagesToPoll)
Poll the specified topic for String key-valued messages
|
Properties |
producerConfig()
Create a producer configuration.
|
<T> List<T> |
readMessages(org.apache.kafka.clients.consumer.KafkaConsumer<T,T> consumer,
String topic,
int expectedMessages,
int timeoutSeconds)
Deprecated.
Use
pollMessages(String, int, Deserializer, Deserializer) instead.
This method is deprecated due to the switch to Kafka 0.9. The new API can return more than the expected number of messages unless manual offset management is performed. Since it is impossible to ensure that the passed-in KafkaConsumer has auto commit disabled, this method should not be used. |
List<String> |
readStringMessages(String topic,
int expectedMessages,
int timeoutSeconds)
Deprecated.
Use
pollStringMessages(String, int) instead
This method is deprecated because it silently creates a new consumer on each invocation -- which could lead to
some mesages being read more than once. It also suffers from the problems described in the deprecation notice for
|
void |
shutdownKafka()
Shutdown Kafka Broker before the test termination to test consumer exceptions
|
void |
startKafka()
Starts the server
|
String |
zookeeperConnectionString()
Get the zookeeper connection string
|
int |
zookeeperPort()
Get the zookeeper port
|
public KafkaJunitRule()
public KafkaJunitRule(int kafkaPort)
public KafkaJunitRule(int kafkaPort,
int zookeeperPort)
public KafkaJunitRule(int kafkaPort,
int zookeeperPort,
Properties brokerProperties)
protected void before()
throws Throwable
before in class org.junit.rules.ExternalResourceThrowableprotected void after()
after in class org.junit.rules.ExternalResourcepublic void shutdownKafka()
public void startKafka()
public Properties producerConfig()
public Properties consumerConfig()
public Properties consumerConfig(boolean enableAutoCommit)
public org.apache.kafka.clients.consumer.KafkaConsumer<String,String> createStringConsumer()
public org.apache.kafka.clients.consumer.KafkaConsumer<String,String> createStringConsumer(boolean autoCommitEnabled)
autoCommitEnabled - Set to true to enable auto commitpublic <K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
consumerConfig()public <K,V> org.apache.kafka.clients.consumer.KafkaConsumer<K,V> createConsumer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer,
boolean autoCommitEnabled)
consumerConfig(boolean)public org.apache.kafka.clients.producer.KafkaProducer<String,String> createStringProducer()
public <K,V> org.apache.kafka.clients.producer.KafkaProducer<K,V> createProducer(org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
producerConfig()@Deprecated public List<String> readStringMessages(String topic, int expectedMessages, int timeoutSeconds) throws TimeoutException
pollStringMessages(String, int) instead
This method is deprecated because it silently creates a new consumer on each invocation -- which could lead to
some mesages being read more than once. It also suffers from the problems described in the deprecation notice for
readMessages(KafkaConsumer, String, int, int) as well.
TimeoutException@Deprecated public <T> List<T> readMessages(org.apache.kafka.clients.consumer.KafkaConsumer<T,T> consumer, String topic, int expectedMessages, int timeoutSeconds) throws TimeoutException
pollMessages(String, int, Deserializer, Deserializer) instead.
This method is deprecated due to the switch to Kafka 0.9. The new API can return more than the expected number of messages unless manual offset management is performed. Since it is impossible to ensure that the passed-in KafkaConsumer has auto commit disabled, this method should not be used.
TimeoutExceptionpublic <K,V> com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> pollMessages(String topic, int numMessagesToPoll, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)
K - Type of KeyV - Type of Valuetopic - Topic to pollnumMessagesToPoll - Number of messages to readkeyDeserializer - Key deserializervalueDeserializer - Value deserializerListenableFuture containing a list of ConsumerRecord objectspublic com.google.common.util.concurrent.ListenableFuture<List<org.apache.kafka.clients.consumer.ConsumerRecord<String,String>>> pollStringMessages(String topic, int numMessagesToPoll)
topic - Topic to pollnumMessagesToPoll - Number of messages to readListenableFuture containing a list of ConsumerRecord objectspublic Path kafkaLogDir()
public int kafkaBrokerPort()
public int zookeeperPort()
public String zookeeperConnectionString()
Copyright © 2016. All rights reserved.