aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWiktor Garbarek <wiktor.garbarek@nokia.com>2018-06-26 16:24:27 +0200
committerWiktor Garbarek <wiktor.garbarek@nokia.com>2018-06-26 16:26:11 +0200
commit6d30dcbdc6d8a4b2633f47f5076335d5a06518b3 (patch)
tree043310f64a3b77e7a812e7c0d2804e46913d4cf2
parent95c897c3c85c305f676dc04adc2f167fecfc598b (diff)
Refactor of KafkaEventConsumer
Moved constructor; added annotations Change-Id: I26c81b16381ed69e170b236cea684be615fa795b Issue-ID: AAI-1248 Signed-off-by: Wiktor Garbarek <wiktor.garbarek@nokia.com>
-rw-r--r--event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java20
1 files changed, 11 insertions, 9 deletions
diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java
index 1deb90a..e08639b 100644
--- a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java
+++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java
@@ -49,6 +49,7 @@ public class KafkaEventConsumer implements EventConsumer {
private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventConsumer.class);
+ @FunctionalInterface
public interface KafkaConsumerFactory {
public KafkaConsumer<String, String> createConsumer(Properties props);
}
@@ -58,15 +59,6 @@ public class KafkaEventConsumer implements EventConsumer {
private final KafkaConsumer<String, String> consumer;
/**
- * Replace the consumer factory (intended to be used for testing purposes only).
- *
- * @param consumerFactory
- */
- static void setConsumerFactory(KafkaConsumerFactory consumerFactory) {
- KafkaEventConsumer.consumerFactory = consumerFactory;
- }
-
- /**
*
* @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The
* client will make use of all servers irrespective of which servers are specified here for
@@ -89,6 +81,16 @@ public class KafkaEventConsumer implements EventConsumer {
consumer.subscribe(Arrays.asList(topic));
}
+ /**
+ * Replace the consumer factory (intended to be used for testing purposes only).
+ *
+ * @param consumerFactory
+ */
+ static void setConsumerFactory(KafkaConsumerFactory consumerFactory) {
+ KafkaEventConsumer.consumerFactory = consumerFactory;
+ }
+
+ @Override
public void close() {
consumer.close();
}