diff options
Diffstat (limited to 'event-client-kafka/src')
4 files changed, 499 insertions, 0 deletions
diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java new file mode 100644 index 0000000..e4da20a --- /dev/null +++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventConsumer.java @@ -0,0 +1,143 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.MessageWithOffset; + +/** + * Event Bus Client consumer API for Kafka Implementation .Its a wrapper around KafkaConsumer which is NOT thread safe. + * The KafkaConsumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer + * after use will leak these connections Ref : https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ + * KafkaConsumer.html + * + */ +public class KafkaEventConsumer implements EventConsumer { + + private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventConsumer.class); + + public interface KafkaConsumerFactory { + public KafkaConsumer<String, String> createConsumer(Properties props); + } + + private static KafkaConsumerFactory consumerFactory = KafkaConsumer::new; + + private final KafkaConsumer<String, String> consumer; + + /** + * Replace the consumer factory (intended to be used for testing purposes only). + * + * @param consumerFactory + */ + static void setConsumerFactory(KafkaConsumerFactory consumerFactory) { + KafkaEventConsumer.consumerFactory = consumerFactory; + } + + /** + * + * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The + * client will make use of all servers irrespective of which servers are specified here for + * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list + * should be in the form host1:port1,host2:port2,.... + * @param topic - Topic to consume the messages from + * @param groupId - A unique string that identifies the consumer group this consumer belongs to + */ + public KafkaEventConsumer(String hosts, String topic, String groupId) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + // Set this property, if auto commit should happen. + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumer = consumerFactory.createConsumer(props); + consumer.subscribe(Arrays.asList(topic)); + } + + public void close() { + consumer.close(); + } + + /** + * + */ + @Override + public Iterable<String> consume() throws Exception { + log.debug("Querying Kafka for messages"); + ConsumerRecords<String, String> records = consumer.poll(0); + List<String> list = new ArrayList<>(); + for (ConsumerRecord<String, String> record : records) { + list.add(record.value()); + } + return list; + + } + + @Override + public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { + log.debug("Querying Kafka for messages"); + ConsumerRecords<String, String> records = consumer.poll(0); + List<MessageWithOffset> list = new ArrayList<>(); + for (ConsumerRecord<String, String> record : records) { + list.add(new MessageWithOffset(record.offset(), record.value())); + } + return list; + } + + @Override + public Iterable<String> consumeAndCommit() throws Exception { + Iterable<String> result = consume(); + consumer.commitSync(); + return result; + } + + @Override + public void commitOffsets() throws Exception { + consumer.commitSync(); + } + + @Override + public void commitOffsets(long offset) throws Exception { + Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); + offsetsMap.put(consumer.assignment().iterator().next(), new OffsetAndMetadata(offset)); + consumer.commitSync(offsetsMap); + } + +} diff --git a/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java new file mode 100644 index 0000000..c05d5c5 --- /dev/null +++ b/event-client-kafka/src/main/java/org/onap/aai/event/client/KafkaEventPublisher.java @@ -0,0 +1,185 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.event.client; + +import java.util.Collection; +import java.util.Properties; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventPublisher; + +/** + * Event Bus Client publisher implementation for Kafka .A KafkaProducer that publishes records to the Kafka cluster is + * thread safe and sharing a single producer instance across threads will generally be faster than having multiple + * instances. Ref :https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/ + * producer/KafkaProducer.html + * + * + */ +public class KafkaEventPublisher implements EventPublisher { + + private static Logger log = LoggerFactory.getInstance().getLogger(KafkaEventPublisher.class); + + public interface KafkaProducerFactory { + public KafkaProducer<String, String> createProducer(Properties props); + } + + private static final String PUBLISHING = "Publishing "; + + private static KafkaProducerFactory producerFactory = KafkaProducer::new; + + private final KafkaProducer<String, String> producer; + private final String topic; + + /** + * Replace the producer factory (intended to be used for testing purposes only). + * + * @param producerFactory + */ + static void setProducerFactory(KafkaProducerFactory producerFactory) { + KafkaEventPublisher.producerFactory = producerFactory; + } + + /** + * + * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The + * client will make use of all servers irrespective of which servers are specified here for + * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list + * should be in the form host1:port1,host2:port2,.... + * @param topic - Topic to publish the messages to + * @param bufferMemory - The total bytes of memory the producer can use to buffer records waiting to be sent to the + * server + * @param batchSize - The producer will attempt to batch records together into fewer requests whenever multiple + * records are being sent to the same partition. This helps performance on both the client and the server. + * This configuration controls the default batch size in bytes + * @param retries -Setting a value greater than zero will cause the client to resend any record whose send fails + * with a potentially transient error. Note that this retry is no different than if the client resent the + * record upon receiving the error. + */ + public KafkaEventPublisher(String hosts, String topic, long bufferMemory, int batchSize, int retries) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + props.put(ProducerConfig.RETRIES_CONFIG, retries); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producer = producerFactory.createProducer(props); + this.topic = topic; + + } + + /** + * + * @param hosts - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The + * client will make use of all servers irrespective of which servers are specified here for + * bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list + * should be in the form host1:port1,host2:port2,.... + * @param topic - Topic to publish the messages to + */ + public KafkaEventPublisher(String hosts, String topic) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producer = producerFactory.createProducer(props); + this.topic = topic; + } + + /** + * Closes the publisher. + */ + @Override + public void close() { + producer.close(); + } + + @Override + public int sendSync(String partitionKey, String message) throws Exception { + log.debug("Publishing message on partitionKey " + partitionKey + ": " + message); + producer.send(new ProducerRecord<String, String>(topic, partitionKey, message)).get(); + return 1; + } + + @Override + public int sendSync(String partitionKey, Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey); + for (String message : messages) { + sendSync(partitionKey, message); + } + return messages.size(); + } + + @Override + public int sendSync(String message) throws Exception { + log.debug("Publishing message : " + message); + producer.send(new ProducerRecord<String, String>(topic, message)).get(); + return 1; + } + + @Override + public int sendSync(Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages "); + for (String message : messages) { + sendSync(message); + } + return messages.size(); + } + + @Override + public void sendAsync(String partitionKey, String message) throws Exception { + log.debug("Publishing message on partitionKey " + partitionKey + ": " + message); + producer.send(new ProducerRecord<String, String>(topic, partitionKey, message)); + + } + + @Override + public void sendAsync(String partitionKey, Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages on partitionKey " + partitionKey); + for (String message : messages) { + sendAsync(partitionKey, message); + } + + } + + @Override + public void sendAsync(String message) throws Exception { + log.debug("Publishing message : " + message); + producer.send(new ProducerRecord<String, String>(topic, message)); + + } + + @Override + public void sendAsync(Collection<String> messages) throws Exception { + log.debug(PUBLISHING + messages.size() + " messages "); + for (String message : messages) { + sendAsync(message); + } + + } + +} diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java new file mode 100644 index 0000000..cd9993b --- /dev/null +++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java @@ -0,0 +1,94 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestKafkaEventConsumer { + + @Mock + public KafkaConsumer<String, String> mockKafkaConsumer; + + @Before + public void init() throws Exception { + KafkaEventConsumer.setConsumerFactory(props -> mockKafkaConsumer); + } + + @Test + public void testConstructor() { + new KafkaEventConsumer("", "", ""); + } + + @Test + public void consumeZeroRecords() throws Exception { + Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(ConsumerRecords.empty()); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.consume(); + consumer.consumeWithOffsets(); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void consumeMultipleRecords() throws Exception { + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + records.put(new TopicPartition(null, 0), + Arrays.asList(new ConsumerRecord<String, String>("topic", 0, 0, "key", "value"))); + Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords<>(records)); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.consume(); + consumer.consumeWithOffsets(); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void commitOffsets() throws Exception { + List<TopicPartition> partitionsList = Arrays.asList(new TopicPartition(null, 0)); + Set<TopicPartition> partitionsSet = Collections.unmodifiableSet(new HashSet<TopicPartition>(partitionsList)); + Mockito.when(mockKafkaConsumer.assignment()).thenReturn(partitionsSet); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.commitOffsets(); + consumer.commitOffsets(0L); + consumer.close(); + } + +} diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java new file mode 100644 index 0000000..8cb1dec --- /dev/null +++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java @@ -0,0 +1,77 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import java.util.Arrays; +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestKafkaEventPublisher { + + @Mock + public KafkaProducer<String, String> mockKafkaProducer; + + @Mock + private Future<RecordMetadata> mockedFuture; + + @Before + public void init() throws Exception { + KafkaEventPublisher.setProducerFactory(props -> mockKafkaProducer); + } + + @Test + public void testConstructors() { + new KafkaEventPublisher("hosts", "topic"); + new KafkaEventPublisher("hosts", "topic", 0, 0, 0); + } + + @Test + public void publishSynchronous() throws Exception { + Mockito.when(mockKafkaProducer.send(Mockito.any())).thenReturn(mockedFuture); + KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic"); + publisher.sendSync(""); + publisher.sendSync(Arrays.asList("")); + publisher.sendSync("key", ""); + publisher.sendSync("key", Arrays.asList("")); + publisher.close(); + } + + @Test + public void publishAsynchronous() throws Exception { + KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic"); + publisher.sendAsync(""); + publisher.sendAsync(Arrays.asList("")); + publisher.sendAsync("key", ""); + publisher.sendAsync("key", Arrays.asList("")); + publisher.close(); + } +} |