diff options
author | 2018-01-29 15:00:04 +0000 | |
---|---|---|
committer | 2018-01-30 10:24:18 +0000 | |
commit | 4150ee34ae503c83734aca7e62ca9911b354c881 (patch) | |
tree | 6598a1d16483eb480664b8862c86ba801af6d661 /event-client-kafka/src/test/java/org/onap | |
parent | 4e6e8b2714f3e5fe6d6dc064d9d73275c5ee5437 (diff) |
Initial code submit for Event Bus Client library
Change-Id: If0ff82c669872c734ebe0c72bc022beb96c53d11
Issue-ID: AAI-700
Signed-off-by: Gilding, Joyce (jg640n) <Joyce.Gilding@amdocs.com>
Diffstat (limited to 'event-client-kafka/src/test/java/org/onap')
-rw-r--r-- | event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java | 94 | ||||
-rw-r--r-- | event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java | 77 |
2 files changed, 171 insertions, 0 deletions
diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java new file mode 100644 index 0000000..cd9993b --- /dev/null +++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventConsumer.java @@ -0,0 +1,94 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestKafkaEventConsumer { + + @Mock + public KafkaConsumer<String, String> mockKafkaConsumer; + + @Before + public void init() throws Exception { + KafkaEventConsumer.setConsumerFactory(props -> mockKafkaConsumer); + } + + @Test + public void testConstructor() { + new KafkaEventConsumer("", "", ""); + } + + @Test + public void consumeZeroRecords() throws Exception { + Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(ConsumerRecords.empty()); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.consume(); + consumer.consumeWithOffsets(); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void consumeMultipleRecords() throws Exception { + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + records.put(new TopicPartition(null, 0), + Arrays.asList(new ConsumerRecord<String, String>("topic", 0, 0, "key", "value"))); + Mockito.when(mockKafkaConsumer.poll(Mockito.anyLong())).thenReturn(new ConsumerRecords<>(records)); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.consume(); + consumer.consumeWithOffsets(); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void commitOffsets() throws Exception { + List<TopicPartition> partitionsList = Arrays.asList(new TopicPartition(null, 0)); + Set<TopicPartition> partitionsSet = Collections.unmodifiableSet(new HashSet<TopicPartition>(partitionsList)); + Mockito.when(mockKafkaConsumer.assignment()).thenReturn(partitionsSet); + KafkaEventConsumer consumer = new KafkaEventConsumer("", "", ""); + consumer.commitOffsets(); + consumer.commitOffsets(0L); + consumer.close(); + } + +} diff --git a/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java new file mode 100644 index 0000000..8cb1dec --- /dev/null +++ b/event-client-kafka/src/test/java/org/onap/aai/event/client/TestKafkaEventPublisher.java @@ -0,0 +1,77 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ + +package org.onap.aai.event.client; + +import java.util.Arrays; +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestKafkaEventPublisher { + + @Mock + public KafkaProducer<String, String> mockKafkaProducer; + + @Mock + private Future<RecordMetadata> mockedFuture; + + @Before + public void init() throws Exception { + KafkaEventPublisher.setProducerFactory(props -> mockKafkaProducer); + } + + @Test + public void testConstructors() { + new KafkaEventPublisher("hosts", "topic"); + new KafkaEventPublisher("hosts", "topic", 0, 0, 0); + } + + @Test + public void publishSynchronous() throws Exception { + Mockito.when(mockKafkaProducer.send(Mockito.any())).thenReturn(mockedFuture); + KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic"); + publisher.sendSync(""); + publisher.sendSync(Arrays.asList("")); + publisher.sendSync("key", ""); + publisher.sendSync("key", Arrays.asList("")); + publisher.close(); + } + + @Test + public void publishAsynchronous() throws Exception { + KafkaEventPublisher publisher = new KafkaEventPublisher("hosts", "topic"); + publisher.sendAsync(""); + publisher.sendAsync(Arrays.asList("")); + publisher.sendAsync("key", ""); + publisher.sendAsync("key", Arrays.asList("")); + publisher.close(); + } +} |