From a19b69d1809955293390c8abd868d0eaff318e5b Mon Sep 17 00:00:00 2001 From: Gurjeet Bedi Date: Tue, 5 Jun 2018 13:13:54 -0400 Subject: Event Client library support for RabbitMQ Update version in RabbitMQ Event Client lib License header update Issue-ID: AAI-1205 Change-Id: I838c184a0b81d89d5d276fbeee373cf5c390fb22 Signed-off-by: Gurjeet Bedi --- .../aai/event/client/TestRabbitMqConsumer.java | 151 +++++++++++++++++++++ .../aai/event/client/TestRabbitMqPublisher.java | 123 +++++++++++++++++ 2 files changed, 274 insertions(+) create mode 100644 event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java create mode 100644 event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqPublisher.java (limited to 'event-client-rabbitmq/src/test/java') diff --git a/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java b/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java new file mode 100644 index 0000000..df4c54b --- /dev/null +++ b/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java @@ -0,0 +1,151 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.event.client; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.onap.aai.event.api.MessageWithOffset; + +import com.rabbitmq.client.Address; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; + +@RunWith(MockitoJUnitRunner.class) +public class TestRabbitMqConsumer { + @Mock + public ConnectionFactory mockConnectionFactory; + + @Mock + public Connection mockConnection; + + @Mock + public Channel mockChannel; + + @Mock + public DeclareOk mockDeclareOK; + + @Mock + BlockingQueue mqueue; + + BuiltinExchangeType topicEx = BuiltinExchangeType.TOPIC; + TimeUnit unit = TimeUnit.MILLISECONDS; + Map hostPortMap = new HashMap(); + + @Before + public void init() throws Exception { + RabbitMqEventConsumer.setConnectionFactory(mockConnectionFactory); + } + + @Test + public void testConstructor() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); + new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + new RabbitMqEventConsumer("hosts", 0, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName", "queueName"); + new RabbitMqEventConsumer(hostPortMap, "userName", "password", "exchangeName", "queueName"); + new RabbitMqEventConsumer(hostPortMap, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName", "queueName"); + new RabbitMqEventConsumer("hosts", 0, "userName", "password", "queue"); + } + + @Test + public void consumeZeroRecords() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + consumer.consume(); + consumer.consumeWithOffsets(); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void commitOffsets() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + consumer.commitOffsets(); + consumer.commitOffsets(0L); + consumer.close(); + } + + @Test + public void consumeMultipleRecords() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); + List records = buildTestMessages(2); + mqueue = new ArrayBlockingQueue<>(2); + mqueue.addAll(records); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + consumer.setMessageQueue(mqueue); + consumer.consumeAndCommit(); + consumer.close(); + } + + @Test + public void consumeWithOffSetsMultipleRecords() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); + List records = buildTestMessages(2); + mqueue = new ArrayBlockingQueue<>(2); + mqueue.addAll(records); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + consumer.setMessageQueue(mqueue); + consumer.consumeWithOffsets(); + consumer.commitOffsets(); + consumer.close(); + } + + private List buildTestMessages(int nbrOfMessages) { + List msgList = new ArrayList(); + for(int i=0;i hostPortMap = new HashMap(); + + @Before + public void init() throws Exception { + RabbitMqEventPublisher.setConnectionFactory(mockConnectionFactory); + } + + @Test + public void testConstructors() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + new RabbitMqEventPublisher("hosts", 0, "userName", "password", "exchangeName"); + new RabbitMqEventPublisher("hosts", 0, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName"); + new RabbitMqEventPublisher(hostPortMap, "userName", "password", "exchangeName"); + new RabbitMqEventPublisher(hostPortMap, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName"); + new RabbitMqEventPublisher("hosts", 0, "userName", "password", "queueName",new HashMap()); + } + + + @Test + public void publishSynchronous() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "exchangeName"); + publisher.sendSync(""); + publisher.sendSync(Arrays.asList("")); + publisher.sendSync("key", ""); + publisher.sendSync("key", Arrays.asList("")); + publisher.close(); + } + + @Test + public void publishAsynchronous() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "exchangeName"); + publisher.sendAsync(""); + publisher.sendAsync(Arrays.asList("")); + publisher.sendAsync("key", ""); + publisher.sendAsync("key", Arrays.asList("")); + publisher.close(); + } + @Test(expected = UnsupportedOperationException.class) + public void UnsupportedMessageTest() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "queueName",new HashMap()); + publisher.sendSync("key", ""); + publisher.sendSync("key", Arrays.asList("")); + publisher.close(); + } + + @Test(expected = UnsupportedOperationException.class) + public void UnSupportedMultiMessageTests() throws Exception { + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "queueName",new HashMap()); + publisher.sendSync("key", Arrays.asList("")); + publisher.close(); + } + +} -- cgit 1.2.3-korg