diff options
Diffstat (limited to 'event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java')
-rw-r--r-- | event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java | 265 |
1 files changed, 162 insertions, 103 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java index c10b46c..340c68f 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java @@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.naming.ConfigurationException; +import javax.naming.ServiceUnavailableException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; @@ -51,127 +52,185 @@ import com.rabbitmq.client.AMQP; * */ public class RabbitMqEventConsumer implements EventConsumer { - private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class); + private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class); - private BlockingQueue<MessageWithOffset> messageQueue; + private BlockingQueue<MessageWithOffset> messageQueue; + + private Connection connection; + private Channel channel; + private Long lastDeliveryTag; + private long timeout = 5000; + private RabbitMqClientConfig config; + private long lastConnectionAttempt = 0L; - private final Connection connection; - private final Channel channel; - private Long lastDeliveryTag; - private long timeout = 5000; - private RabbitMqClientConfig config; - - /** * (intended for testing prupose only) * @param messageQueue */ - public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) { - this.messageQueue = messageQueue; - } + public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) { + this.messageQueue = messageQueue; + } public RabbitMqEventConsumer(RabbitMqClientConfig config) throws Exception { this.config = config; this.messageQueue = new ArrayBlockingQueue<>(1000); - + if (config.getQueue() == null) { throw new ConfigurationException("Mandatory config param queue not set"); } - - this.connection = RabbitMqUtils.createConnection(config); - this.channel = connection.createChannel(); - - if (config.getExchangeName() != null) { - channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); - channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); - channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey()); + + try { + createConnection(); + } + catch (ConfigurationException ex) { + // If the configuration is bad, we may as well re-throw the exception and let the process die. + throw ex; + } + catch (Exception ex) { + // Otherwise, let the process live. We can retry establishing a connection later. + log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage()); + return; + } + } + + private synchronized void createConnection() throws Exception { + if (connection != null) { + return; + } + + long currentTime = System.currentTimeMillis(); + + if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) { + log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "No attempt to consume. No connection to RMQ."); + throw new ServiceUnavailableException("Waiting for retry interval"); + } + + lastConnectionAttempt = currentTime; + + try { + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); + + if (config.getExchangeName() != null) { + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey()); + } + else { + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + } + + channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false } - else { - channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + catch (Exception ex) { + close(); + throw ex; } - - channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false + + log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event consumer successfully connected to RMQ"); } - - @Override - public Iterable<String> consumeAndCommit() throws Exception { - Iterable<String> list = consume(); - commitOffsets(); - return list; - } - - @Override - public Iterable<String> consume() throws Exception { - List<String> list = new ArrayList<>(); - MessageWithOffset record = null; - if(messageQueue.peek()!=null) { - do - { - record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); - lastDeliveryTag = record.getOffset(); - list.add(record.getMessage()); - }while(messageQueue.peek()!=null); - } - return list; - } - - @Override - public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { - List<MessageWithOffset> list = new ArrayList<>(); - MessageWithOffset record = null; - if(messageQueue.peek()!=null) { - do - { - record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); - lastDeliveryTag = record.getOffset(); - list.add(record); - }while(messageQueue.peek()!=null); - } + @Override + public Iterable<String> consumeAndCommit() throws Exception { + Iterable<String> list = consume(); + commitOffsets(); return list; - } - - @Override - public void commitOffsets() throws Exception { - if(lastDeliveryTag != null) - { - channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked - lastDeliveryTag = null; - } - } - - @Override - public void commitOffsets(long offset) throws Exception { - channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked - } - - /** - * Closes the channel - * @throws Exception - */ - public void close() throws Exception { - channel.close(); - connection.close(); - } - - class CallBackConsumer extends DefaultConsumer{ - CallBackConsumer(Channel channel) { - super(channel); - } + } + + @Override + public Iterable<String> consume() throws Exception { + if (connection == null) { + createConnection(); + } + + List<String> list = new ArrayList<>(); + MessageWithOffset record = null; + if(messageQueue.peek()!=null) { + do + { + record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); + lastDeliveryTag = record.getOffset(); + list.add(record.getMessage()); + }while(messageQueue.peek()!=null); + } + return list; + } + + @Override + public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { + if (connection == null) { + createConnection(); + } + + List<MessageWithOffset> list = new ArrayList<>(); + MessageWithOffset record = null; + if(messageQueue.peek()!=null) { + do + { + record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); + lastDeliveryTag = record.getOffset(); + list.add(record); + }while(messageQueue.peek()!=null); + } + return list; + } + + @Override + public void commitOffsets() throws Exception { + if (connection == null) { + createConnection(); + } + + if(lastDeliveryTag != null) + { + channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked + lastDeliveryTag = null; + } + } + + @Override + public void commitOffsets(long offset) throws Exception { + if (connection == null) { + createConnection(); + } + + channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked + } + + /** + * Closes the channel + * @throws Exception + */ + public void close() throws Exception { + if (channel != null) { + channel.close(); + channel = null; + } + + if (connection != null) { + connection.close(); + connection = null; + } + } + + class CallBackConsumer extends DefaultConsumer{ + CallBackConsumer(Channel channel) { + super(channel); + } @Override public void handleDelivery(String consumerTag, Envelope envelope, - AMQP.BasicProperties properties, byte[] body) throws IOException { - String message = new String(body, "UTF-8"); - try - { - MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message); - messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS); - } - catch(Exception e) - { - log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue"); - channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue - } - } - } + AMQP.BasicProperties properties, byte[] body) throws IOException { + String message = new String(body, "UTF-8"); + try + { + MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message); + messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS); + } + catch(Exception e) + { + log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue"); + channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue + } + } + } } |