diff options
Diffstat (limited to 'event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java')
-rw-r--r-- | event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java | 191 |
1 files changed, 28 insertions, 163 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java index c564198..c10b46c 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java @@ -22,28 +22,23 @@ package org.onap.aai.event.client; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import javax.naming.ConfigurationException; + import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.event.api.EventConsumer; import org.onap.aai.event.api.MessageWithOffset; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.AMQP.Exchange.DeclareOk; /** * Event bus client consumer wrapper for RabbitMQ. @@ -57,27 +52,16 @@ import com.rabbitmq.client.AMQP.Exchange.DeclareOk; */ public class RabbitMqEventConsumer implements EventConsumer { private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class); - private static ConnectionFactory factory = new ConnectionFactory(); - private BlockingQueue<MessageWithOffset> messageQueue; - private static String BINDING_CONSUME_ALL = "#"; + private BlockingQueue<MessageWithOffset> messageQueue; private final Connection connection; private final Channel channel; - private final String queueName; - private DeclareOk exchangeInfo; private Long lastDeliveryTag; private long timeout = 5000; - private com.rabbitmq.client.AMQP.Queue.DeclareOk queueInfo; + private RabbitMqClientConfig config; + - /** - * (intended for testing prupose only) - * @param connFactory - */ - static void setConnectionFactory(ConnectionFactory connFactory) { - factory = connFactory; - } - /** * (intended for testing prupose only) * @param messageQueue @@ -86,148 +70,29 @@ public class RabbitMqEventConsumer implements EventConsumer { this.messageQueue = messageQueue; } - /** - * Constructor to open a consumer on single host port for a topic exchange with specific queue name which will - * consume all messages from topic - * @param host - * @param port - * @param userName - * @param password - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeName, String queueName) throws Exception { - this(host, port, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on single host port for a exchange with specific queue name which will - * consume all messages from topic - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception { - this(host, port, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on single host port for a exchange with specific queue name - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param queueName - * @param bindingKey - Bind the queue to specific messages only - * @throws Exception - */ - public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception { - this(buildMap(host, port), userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>()); - } - - /** - * Constructor to open a consumer on multiple host port for a topic exchange with specific queue name which will - * consume all messages from topic - * @param hostPortMap - * @param userName - * @param password - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeName, String queueName) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on multiple host port for a exchange with specific queue name which will - * consume all messages from topic - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception { - this(hostPortMap, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on multiple host port for a exchange with specific queue name - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param queueName - * @param bindingKey - Bind the queue to specific messages only - * @throws Exception - */ - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>()); - } - - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, BuiltinExchangeType exchangeType, String exchangeName, String queueName, String bindingKey, Map<String, Object> exchangeArguments) throws Exception { - messageQueue = new ArrayBlockingQueue<>(1000); - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - exchangeInfo = channel.exchangeDeclare(exchangeName, exchangeType, true, false, exchangeArguments); - this.queueName = queueName; - channel.queueDeclare(queueName, true, false, false, null); - channel.queueBind(queueName, exchangeName, bindingKey); - String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false - } - - public RabbitMqEventConsumer(String host, int port, String userName, String password, String queue) throws Exception { - this(buildMap(host, port), userName, password, queue, new HashMap<String, Object>()); - } - - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { - messageQueue = new ArrayBlockingQueue<>(1000); - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - this.queueName = queue; - queueInfo = channel.queueDeclare(queueName, true, false, false, queueArguments); - String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false - } - - private static Map<String, Integer> buildMap(String host, Integer port) { - Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); - hostPortMap.put(host, port); - return hostPortMap; - } + public RabbitMqEventConsumer(RabbitMqClientConfig config) throws Exception { + this.config = config; + this.messageQueue = new ArrayBlockingQueue<>(1000); + + if (config.getQueue() == null) { + throw new ConfigurationException("Mandatory config param queue not set"); + } + + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); + + if (config.getExchangeName() != null) { + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey()); + } + else { + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + } + + channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false + } + @Override public Iterable<String> consumeAndCommit() throws Exception { |