aboutsummaryrefslogtreecommitdiffstats
path: root/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java')
-rw-r--r--event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java191
1 files changed, 28 insertions, 163 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
index c564198..c10b46c 100644
--- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
+++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
@@ -22,28 +22,23 @@ package org.onap.aai.event.client;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import javax.naming.ConfigurationException;
+
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.event.api.EventConsumer;
import org.onap.aai.event.api.MessageWithOffset;
-import com.rabbitmq.client.Address;
-import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
/**
* Event bus client consumer wrapper for RabbitMQ.
@@ -57,27 +52,16 @@ import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
*/
public class RabbitMqEventConsumer implements EventConsumer {
private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class);
- private static ConnectionFactory factory = new ConnectionFactory();
- private BlockingQueue<MessageWithOffset> messageQueue;
- private static String BINDING_CONSUME_ALL = "#";
+ private BlockingQueue<MessageWithOffset> messageQueue;
private final Connection connection;
private final Channel channel;
- private final String queueName;
- private DeclareOk exchangeInfo;
private Long lastDeliveryTag;
private long timeout = 5000;
- private com.rabbitmq.client.AMQP.Queue.DeclareOk queueInfo;
+ private RabbitMqClientConfig config;
+
- /**
- * (intended for testing prupose only)
- * @param connFactory
- */
- static void setConnectionFactory(ConnectionFactory connFactory) {
- factory = connFactory;
- }
-
/**
* (intended for testing prupose only)
* @param messageQueue
@@ -86,148 +70,29 @@ public class RabbitMqEventConsumer implements EventConsumer {
this.messageQueue = messageQueue;
}
- /**
- * Constructor to open a consumer on single host port for a topic exchange with specific queue name which will
- * consume all messages from topic
- * @param host
- * @param port
- * @param userName
- * @param password
- * @param exchangeName
- * @param queueName
- * @throws Exception
- */
- public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeName, String queueName) throws Exception {
- this(host, port, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL);
- }
-
- /**
- * Constructor to open a consumer on single host port for a exchange with specific queue name which will
- * consume all messages from topic
- * @param host
- * @param port
- * @param userName
- * @param password
- * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS
- * @param exchangeName
- * @param queueName
- * @throws Exception
- */
- public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception {
- this(host, port, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL);
- }
-
- /**
- * Constructor to open a consumer on single host port for a exchange with specific queue name
- * @param host
- * @param port
- * @param userName
- * @param password
- * @param exchangeType
- * @param exchangeName
- * @param queueName
- * @param bindingKey - Bind the queue to specific messages only
- * @throws Exception
- */
- public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception {
- this(buildMap(host, port), userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>());
- }
-
- /**
- * Constructor to open a consumer on multiple host port for a topic exchange with specific queue name which will
- * consume all messages from topic
- * @param hostPortMap
- * @param userName
- * @param password
- * @param exchangeName
- * @param queueName
- * @throws Exception
- */
- public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeName, String queueName) throws Exception {
- this(hostPortMap, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL);
- }
-
- /**
- * Constructor to open a consumer on multiple host port for a exchange with specific queue name which will
- * consume all messages from topic
- * @param hostPortMap
- * @param userName
- * @param password
- * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS
- * @param exchangeName
- * @param queueName
- * @throws Exception
- */
- public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception {
- this(hostPortMap, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL);
- }
-
- /**
- * Constructor to open a consumer on multiple host port for a exchange with specific queue name
- * @param hostPortMap
- * @param userName
- * @param password
- * @param exchangeType
- * @param exchangeName
- * @param queueName
- * @param bindingKey - Bind the queue to specific messages only
- * @throws Exception
- */
- public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception {
- this(hostPortMap, userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>());
- }
-
- public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, BuiltinExchangeType exchangeType, String exchangeName, String queueName, String bindingKey, Map<String, Object> exchangeArguments) throws Exception {
- messageQueue = new ArrayBlockingQueue<>(1000);
- List<Address> addresses = new ArrayList<Address>();
- Iterator<String> iter = hostPortMap.keySet().iterator();
- while (iter.hasNext())
- {
- String host = iter.next();
- int port = hostPortMap.get(host);
- Address add = new Address(host,port);
- addresses.add(add);
- }
- factory.setUsername(userName);
- factory.setPassword(password);
- connection = factory.newConnection(addresses);
- channel = connection.createChannel();
- exchangeInfo = channel.exchangeDeclare(exchangeName, exchangeType, true, false, exchangeArguments);
- this.queueName = queueName;
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, bindingKey);
- String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false
- }
-
- public RabbitMqEventConsumer(String host, int port, String userName, String password, String queue) throws Exception {
- this(buildMap(host, port), userName, password, queue, new HashMap<String, Object>());
- }
-
- public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception {
- messageQueue = new ArrayBlockingQueue<>(1000);
- List<Address> addresses = new ArrayList<Address>();
- Iterator<String> iter = hostPortMap.keySet().iterator();
- while (iter.hasNext())
- {
- String host = iter.next();
- int port = hostPortMap.get(host);
- Address add = new Address(host,port);
- addresses.add(add);
- }
- factory.setUsername(userName);
- factory.setPassword(password);
- connection = factory.newConnection(addresses);
- channel = connection.createChannel();
- this.queueName = queue;
- queueInfo = channel.queueDeclare(queueName, true, false, false, queueArguments);
- String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false
- }
-
- private static Map<String, Integer> buildMap(String host, Integer port) {
- Map<String, Integer> hostPortMap = new HashMap<String, Integer>();
- hostPortMap.put(host, port);
- return hostPortMap;
- }
+ public RabbitMqEventConsumer(RabbitMqClientConfig config) throws Exception {
+ this.config = config;
+ this.messageQueue = new ArrayBlockingQueue<>(1000);
+
+ if (config.getQueue() == null) {
+ throw new ConfigurationException("Mandatory config param queue not set");
+ }
+
+ this.connection = RabbitMqUtils.createConnection(config);
+ this.channel = connection.createChannel();
+
+ if (config.getExchangeName() != null) {
+ channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
+ channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey());
+ }
+ else {
+ channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ }
+
+ channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false
+ }
+
@Override
public Iterable<String> consumeAndCommit() throws Exception {