aboutsummaryrefslogtreecommitdiffstats
path: root/event-client-kafka
diff options
context:
space:
mode:
Diffstat (limited to 'event-client-kafka')
-rw-r--r--event-client-kafka/pom.xml31
-rw-r--r--event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java143
-rw-r--r--event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java185
-rw-r--r--event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java94
-rw-r--r--event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java77
5 files changed, 530 insertions, 0 deletions
diff --git a/event-client-kafka/pom.xml b/event-client-kafka/pom.xml
new file mode 100644
index 0000000..75469b9
--- /dev/null
+++ b/event-client-kafka/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-service</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>event-client-kafka</artifactId>
+
+ <properties>
+ <common.logging.version>1.2.0</common.logging.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.logging-service</groupId>
+ <artifactId>common-logging</artifactId>
+ <version>${common.logging.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.event</groupId>
+ <artifactId>event-client-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+</project> \ No newline at end of file
diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java
new file mode 100644
index 0000000..e4da20a
--- /dev/null
+++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java
@@ -0,0 +1,143 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.MessageWithOffset;
+
+/**
+ * Event Bus Client consumer API for Kafka Implementation .Its a wrapper around KafkaConsumer which is NOT thread safe.
+ * The KafkaConsumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer
+ * after use will leak these connections Ref : https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/
+ * KafkaConsumer.html
+ *
+ */
+public class KafkaEventConsumer implements EventConsumer {
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventConsumer.class);
+
+ public interface KafkaConsumerFactory {
+ public KafkaConsumer<String, String> createConsumer(Properties props);
+ }
+
+ private static KafkaConsumerFactory consumerFactory = KafkaConsumer::new;
+
+ private final KafkaConsumer<String, String> consumer;
+
+ /**
+ * Replace the consumer factory (intended to be used for testing purposes only).
+ *
+ * @param consumerFactory
+ */
+ static void setConsumerFactory(KafkaConsumerFactory consumerFactory) {
+ KafkaEventConsumer.consumerFactory = consumerFactory;
+ }
+
+ /**
+ *
+ * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The
+ * client will make use of all servers irrespective of which servers are specified here for
+ * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list
+ * should be in the form host1:port1,host2:port2,....
+ * @param topic - Topic to consume the messages from
+ * @param groupId - A unique string that identifies the consumer group this consumer belongs to
+ */
+ public KafkaEventConsumer(String hosts, String topic, String groupId) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+ // Set this property, if auto commit should happen.
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumer = consumerFactory.createConsumer(props);
+ consumer.subscribe(Arrays.asList(topic));
+ }
+
+ public void close() {
+ consumer.close();
+ }
+
+ /**
+ *
+ */
+ @Override
+ public Iterable<String> consume() throws Exception {
+ log.debug("Querying Kafka for messages");
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ List<String> list = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : records) {
+ list.add(record.value());
+ }
+ return list;
+
+ }
+
+ @Override
+ public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
+ log.debug("Querying Kafka for messages");
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ List<MessageWithOffset> list = new ArrayList<>();
+ for (ConsumerRecord<String, String> record : records) {
+ list.add(new MessageWithOffset(record.offset(), record.value()));
+ }
+ return list;
+ }
+
+ @Override
+ public Iterable<String> consumeAndCommit() throws Exception {
+ Iterable<String> result = consume();
+ consumer.commitSync();
+ return result;
+ }
+
+ @Override
+ public void commitOffsets() throws Exception {
+ consumer.commitSync();
+ }
+
+ @Override
+ public void commitOffsets(long offset) throws Exception {
+ Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
+ offsetsMap.put(consumer.assignment().iterator().next(), new OffsetAndMetadata(offset));
+ consumer.commitSync(offsetsMap);
+ }
+
+}
diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java
new file mode 100644
index 0000000..c05d5c5
--- /dev/null
+++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java
@@ -0,0 +1,185 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.event.client;
+
+import java.util.Collection;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventPublisher;
+
+/**
+ * Event Bus Client publisher implementation for Kafka .A KafkaProducer that publishes records to the Kafka cluster is
+ * thread safe and sharing a single producer instance across threads will generally be faster than having multiple
+ * instances. Ref :https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/
+ * producer/KafkaProducer.html
+ *
+ *
+ */
+public class KafkaEventPublisher implements EventPublisher {
+
+ private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventPublisher.class);
+
+ public interface KafkaProducerFactory {
+ public KafkaProducer<String, String> createProducer(Properties props);
+ }
+
+ private static final String PUBLISHING = "Publishing ";
+
+ private static KafkaProducerFactory producerFactory = KafkaProducer::new;
+
+ private final KafkaProducer<String, String> producer;
+ private final String topic;
+
+ /**
+ * Replace the producer factory (intended to be used for testing purposes only).
+ *
+ * @param producerFactory
+ */
+ static void setProducerFactory(KafkaProducerFactory producerFactory) {
+ KafkaEventPublisher.producerFactory = producerFactory;
+ }
+
+ /**
+ *
+ * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The
+ * client will make use of all servers irrespective of which servers are specified here for
+ * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list
+ * should be in the form host1:port1,host2:port2,....
+ * @param topic - Topic to publish the messages to
+ * @param bufferMemory - The total bytes of memory the producer can use to buffer records waiting to be sent to the
+ * server
+ * @param batchSize - The producer will attempt to batch records together into fewer requests whenever multiple
+ * records are being sent to the same partition. This helps performance on both the client and the server.
+ * This configuration controls the default batch size in bytes
+ * @param retries -Setting a value greater than zero will cause the client to resend any record whose send fails
+ * with a potentially transient error. Note that this retry is no different than if the client resent the
+ * record upon receiving the error.
+ */
+ public KafkaEventPublisher(String hosts, String topic, long bufferMemory, int batchSize, int retries) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+ props.put(ProducerConfig.RETRIES_CONFIG, retries);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producer = producerFactory.createProducer(props);
+ this.topic = topic;
+
+ }
+
+ /**
+ *
+ * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The
+ * client will make use of all servers irrespective of which servers are specified here for
+ * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list
+ * should be in the form host1:port1,host2:port2,....
+ * @param topic - Topic to publish the messages to
+ */
+ public KafkaEventPublisher(String hosts, String topic) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producer = producerFactory.createProducer(props);
+ this.topic = topic;
+ }
+
+ /**
+ * Closes the publisher.
+ */
+ @Override
+ public void close() {
+ producer.close();
+ }
+
+ @Override
+ public int sendSync(String partitionKey, String message) throws Exception {
+ log.debug("Publishing message on partitionKey " + partitionKey + ": " + message);
+ producer.send(new ProducerRecord<String, String>(topic, partitionKey, message)).get();
+ return 1;
+ }
+
+ @Override
+ public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey);
+ for (String message : messages) {
+ sendSync(partitionKey, message);
+ }
+ return messages.size();
+ }
+
+ @Override
+ public int sendSync(String message) throws Exception {
+ log.debug("Publishing message : " + message);
+ producer.send(new ProducerRecord<String, String>(topic, message)).get();
+ return 1;
+ }
+
+ @Override
+ public int sendSync(Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages ");
+ for (String message : messages) {
+ sendSync(message);
+ }
+ return messages.size();
+ }
+
+ @Override
+ public void sendAsync(String partitionKey, String message) throws Exception {
+ log.debug("Publishing message on partitionKey " + partitionKey + ": " + message);
+ producer.send(new ProducerRecord<String, String>(topic, partitionKey, message));
+
+ }
+
+ @Override
+ public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey);
+ for (String message : messages) {
+ sendAsync(partitionKey, message);
+ }
+
+ }
+
+ @Override
+ public void sendAsync(String message) throws Exception {
+ log.debug("Publishing message : " + message);
+ producer.send(new ProducerRecord<String, String>(topic, message));
+
+ }
+
+ @Override
+ public void sendAsync(Collection<String> messages) throws Exception {
+ log.debug(PUBLISHING + messages.size() + " messages ");
+ for (String message : messages) {
+ sendAsync(message);
+ }
+
+ }
+
+}
diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java
new file mode 100644
index 0000000..cd9993b
--- /dev/null
+++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java
@@ -0,0 +1,94 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+
+package org.onap.aai.event.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestKafkaEventConsumer {
+
+ @Mock
+ public KafkaConsumer<String, String> mockKafkaConsumer;
+
+ @Before
+ public void init() throws Exception {
+ KafkaEventConsumer.setConsumerFactory(props -> mockKafkaConsumer);
+ }
+
+ @Test
+ public void testConstructor() {
+ new KafkaEventConsumer("", "", "");
+ }
+
+ @Test
+ public void consumeZeroRecords() throws Exception {
+ Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(ConsumerRecords.empty());
+ KafkaEventConsumer consumer = new KafkaEventConsumer("", "", "");
+ consumer.consume();
+ consumer.consumeWithOffsets();
+ consumer.consumeAndCommit();
+ consumer.close();
+ }
+
+ @Test
+ public void consumeMultipleRecords() throws Exception {
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ records.put(new TopicPartition(null, 0),
+ Arrays.asList(new ConsumerRecord<String, String>("topic", 0, 0, "key", "value")));
+ Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords<>(records));
+ KafkaEventConsumer consumer = new KafkaEventConsumer("", "", "");
+ consumer.consume();
+ consumer.consumeWithOffsets();
+ consumer.consumeAndCommit();
+ consumer.close();
+ }
+
+ @Test
+ public void commitOffsets() throws Exception {
+ List<TopicPartition> partitionsList = Arrays.asList(new TopicPartition(null, 0));
+ Set<TopicPartition> partitionsSet = Collections.unmodifiableSet(new HashSet<TopicPartition>(partitionsList));
+ Mockito.when(mockKafkaConsumer.assignment()).thenReturn(partitionsSet);
+ KafkaEventConsumer consumer = new KafkaEventConsumer("", "", "");
+ consumer.commitOffsets();
+ consumer.commitOffsets(0L);
+ consumer.close();
+ }
+
+}
diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java
new file mode 100644
index 0000000..8cb1dec
--- /dev/null
+++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java
@@ -0,0 +1,77 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+
+package org.onap.aai.event.client;
+
+import java.util.Arrays;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestKafkaEventPublisher {
+
+ @Mock
+ public KafkaProducer<String, String> mockKafkaProducer;
+
+ @Mock
+ private Future<RecordMetadata> mockedFuture;
+
+ @Before
+ public void init() throws Exception {
+ KafkaEventPublisher.setProducerFactory(props -> mockKafkaProducer);
+ }
+
+ @Test
+ public void testConstructors() {
+ new KafkaEventPublisher("hosts", "topic");
+ new KafkaEventPublisher("hosts", "topic", 0, 0, 0);
+ }
+
+ @Test
+ public void publishSynchronous() throws Exception {
+ Mockito.when(mockKafkaProducer.send(Mockito.any())).thenReturn(mockedFuture);
+ KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic");
+ publisher.sendSync("");
+ publisher.sendSync(Arrays.asList(""));
+ publisher.sendSync("key", "");
+ publisher.sendSync("key", Arrays.asList(""));
+ publisher.close();
+ }
+
+ @Test
+ public void publishAsynchronous() throws Exception {
+ KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic");
+ publisher.sendAsync("");
+ publisher.sendAsync(Arrays.asList(""));
+ publisher.sendAsync("key", "");
+ publisher.sendAsync("key", Arrays.asList(""));
+ publisher.close();
+ }
+}