aboutsummaryrefslogtreecommitdiffstats
path: root/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java')
-rw-r--r--event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java265
1 files changed, 162 insertions, 103 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
index c10b46c..340c68f 100644
--- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
+++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
@@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.naming.ConfigurationException;
+import javax.naming.ServiceUnavailableException;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
@@ -51,127 +52,185 @@ import com.rabbitmq.client.AMQP;
*
*/
public class RabbitMqEventConsumer implements EventConsumer {
- private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class);
+ private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class);
- private BlockingQueue<MessageWithOffset> messageQueue;
+ private BlockingQueue<MessageWithOffset> messageQueue;
+
+ private Connection connection;
+ private Channel channel;
+ private Long lastDeliveryTag;
+ private long timeout = 5000;
+ private RabbitMqClientConfig config;
+ private long lastConnectionAttempt = 0L;
- private final Connection connection;
- private final Channel channel;
- private Long lastDeliveryTag;
- private long timeout = 5000;
- private RabbitMqClientConfig config;
-
-
/**
* (intended for testing prupose only)
* @param messageQueue
*/
- public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) {
- this.messageQueue = messageQueue;
- }
+ public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) {
+ this.messageQueue = messageQueue;
+ }
public RabbitMqEventConsumer(RabbitMqClientConfig config) throws Exception {
this.config = config;
this.messageQueue = new ArrayBlockingQueue<>(1000);
-
+
if (config.getQueue() == null) {
throw new ConfigurationException("Mandatory config param queue not set");
}
-
- this.connection = RabbitMqUtils.createConnection(config);
- this.channel = connection.createChannel();
-
- if (config.getExchangeName() != null) {
- channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
- channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
- channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey());
+
+ try {
+ createConnection();
+ }
+ catch (ConfigurationException ex) {
+ // If the configuration is bad, we may as well re-throw the exception and let the process die.
+ throw ex;
+ }
+ catch (Exception ex) {
+ // Otherwise, let the process live. We can retry establishing a connection later.
+ log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage());
+ return;
+ }
+ }
+
+ private synchronized void createConnection() throws Exception {
+ if (connection != null) {
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+
+ if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) {
+ log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "No attempt to consume. No connection to RMQ.");
+ throw new ServiceUnavailableException("Waiting for retry interval");
+ }
+
+ lastConnectionAttempt = currentTime;
+
+ try {
+ this.connection = RabbitMqUtils.createConnection(config);
+ this.channel = connection.createChannel();
+
+ if (config.getExchangeName() != null) {
+ channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
+ channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey());
+ }
+ else {
+ channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ }
+
+ channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false
}
- else {
- channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ catch (Exception ex) {
+ close();
+ throw ex;
}
-
- channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false
+
+ log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event consumer successfully connected to RMQ");
}
-
- @Override
- public Iterable<String> consumeAndCommit() throws Exception {
- Iterable<String> list = consume();
- commitOffsets();
- return list;
- }
-
- @Override
- public Iterable<String> consume() throws Exception {
- List<String> list = new ArrayList<>();
- MessageWithOffset record = null;
- if(messageQueue.peek()!=null) {
- do
- {
- record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
- lastDeliveryTag = record.getOffset();
- list.add(record.getMessage());
- }while(messageQueue.peek()!=null);
- }
- return list;
- }
-
- @Override
- public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
- List<MessageWithOffset> list = new ArrayList<>();
- MessageWithOffset record = null;
- if(messageQueue.peek()!=null) {
- do
- {
- record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
- lastDeliveryTag = record.getOffset();
- list.add(record);
- }while(messageQueue.peek()!=null);
- }
+ @Override
+ public Iterable<String> consumeAndCommit() throws Exception {
+ Iterable<String> list = consume();
+ commitOffsets();
return list;
- }
-
- @Override
- public void commitOffsets() throws Exception {
- if(lastDeliveryTag != null)
- {
- channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
- lastDeliveryTag = null;
- }
- }
-
- @Override
- public void commitOffsets(long offset) throws Exception {
- channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
- }
-
- /**
- * Closes the channel
- * @throws Exception
- */
- public void close() throws Exception {
- channel.close();
- connection.close();
- }
-
- class CallBackConsumer extends DefaultConsumer{
- CallBackConsumer(Channel channel) {
- super(channel);
- }
+ }
+
+ @Override
+ public Iterable<String> consume() throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ List<String> list = new ArrayList<>();
+ MessageWithOffset record = null;
+ if(messageQueue.peek()!=null) {
+ do
+ {
+ record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
+ lastDeliveryTag = record.getOffset();
+ list.add(record.getMessage());
+ }while(messageQueue.peek()!=null);
+ }
+ return list;
+ }
+
+ @Override
+ public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ List<MessageWithOffset> list = new ArrayList<>();
+ MessageWithOffset record = null;
+ if(messageQueue.peek()!=null) {
+ do
+ {
+ record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
+ lastDeliveryTag = record.getOffset();
+ list.add(record);
+ }while(messageQueue.peek()!=null);
+ }
+ return list;
+ }
+
+ @Override
+ public void commitOffsets() throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ if(lastDeliveryTag != null)
+ {
+ channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
+ lastDeliveryTag = null;
+ }
+ }
+
+ @Override
+ public void commitOffsets(long offset) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
+ }
+
+ /**
+ * Closes the channel
+ * @throws Exception
+ */
+ public void close() throws Exception {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ }
+
+ class CallBackConsumer extends DefaultConsumer{
+ CallBackConsumer(Channel channel) {
+ super(channel);
+ }
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- try
- {
- MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message);
- messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS);
- }
- catch(Exception e)
- {
- log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue");
- channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue
- }
- }
- }
+ AMQP.BasicProperties properties, byte[] body) throws IOException {
+ String message = new String(body, "UTF-8");
+ try
+ {
+ MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message);
+ messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS);
+ }
+ catch(Exception e)
+ {
+ log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue");
+ channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue
+ }
+ }
+ }
}