diff options
author | sblimkie <steven.blimkie@amdocs.com> | 2019-02-21 13:37:16 -0500 |
---|---|---|
committer | sblimkie <steven.blimkie@amdocs.com> | 2019-02-21 14:00:01 -0500 |
commit | 27bcf86da3ba3a68c209db411292ea090fd8df52 (patch) | |
tree | da63232903df4b6c2fe42b1ed16fcb01a3bfaa95 /event-client-rabbitmq/src/main | |
parent | f2daa751f0c6e838316aa90f6a25ed267c8c6d48 (diff) |
Event client to retry in case of failed connection
On initial creation of the event client consumer/publisher
handle a failure to connect to RMQ gracefully such that the
application can still start up and re-try connecting at a later
time
Change-Id: I4ba8e2d2a8b4e4e5a60255550bb74a501494c9bd
Issue-ID: AAI-2185
Signed-off-by: sblimkie <steven.blimkie@amdocs.com>
Diffstat (limited to 'event-client-rabbitmq/src/main')
5 files changed, 325 insertions, 113 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqApplicationMsgs.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqApplicationMsgs.java new file mode 100644 index 0000000..22df2b6 --- /dev/null +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqApplicationMsgs.java @@ -0,0 +1,39 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2019 AT&T Intellectual Property. All rights reserved. + * Copyright © 2019 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event.client; + +import com.att.eelf.i18n.EELFResourceManager; +import org.onap.aai.cl.eelf.LogMessageEnum; + +/** + * Logger messages + */ +public enum RabbitMqApplicationMsgs implements LogMessageEnum { + + MESSAGE_ERROR, MESSAGE_INFO, MESSAGE_WARN; + + /** + * Static initializer to ensure the resource bundles for this class are loaded... + */ + static { + EELFResourceManager.loadMessageBundle("resources"); + } +} diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java index d177989..108f730 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java @@ -29,6 +29,7 @@ import com.rabbitmq.client.BuiltinExchangeType; public class RabbitMqClientConfig { private static String BINDING_CONSUME_ALL = "#"; + private static Integer DEFAULT_RETRY_INTERVAL = 15000; private String hosts; private String username; @@ -46,6 +47,7 @@ public class RabbitMqClientConfig { private String sslKeyStorePassword; private String sslTrustStorePassword; private String connectionTimeout; + private Integer retryInterval; private String enableSsl; public String getUsername() { @@ -181,6 +183,18 @@ public class RabbitMqClientConfig { this.connectionTimeout = connectionTimeout; } + public Integer getRetryInterval() { + if (retryInterval == null) { + return DEFAULT_RETRY_INTERVAL; + } + + return retryInterval; + } + + public void setRetryInterval(Integer retryInterval) { + this.retryInterval = retryInterval; + } + public Boolean getEnableSsl() { if (enableSsl != null) { return Boolean.valueOf(enableSsl); diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java index c10b46c..340c68f 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java @@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.naming.ConfigurationException; +import javax.naming.ServiceUnavailableException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; @@ -51,127 +52,185 @@ import com.rabbitmq.client.AMQP; * */ public class RabbitMqEventConsumer implements EventConsumer { - private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class); + private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class); - private BlockingQueue<MessageWithOffset> messageQueue; + private BlockingQueue<MessageWithOffset> messageQueue; + + private Connection connection; + private Channel channel; + private Long lastDeliveryTag; + private long timeout = 5000; + private RabbitMqClientConfig config; + private long lastConnectionAttempt = 0L; - private final Connection connection; - private final Channel channel; - private Long lastDeliveryTag; - private long timeout = 5000; - private RabbitMqClientConfig config; - - /** * (intended for testing prupose only) * @param messageQueue */ - public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) { - this.messageQueue = messageQueue; - } + public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) { + this.messageQueue = messageQueue; + } public RabbitMqEventConsumer(RabbitMqClientConfig config) throws Exception { this.config = config; this.messageQueue = new ArrayBlockingQueue<>(1000); - + if (config.getQueue() == null) { throw new ConfigurationException("Mandatory config param queue not set"); } - - this.connection = RabbitMqUtils.createConnection(config); - this.channel = connection.createChannel(); - - if (config.getExchangeName() != null) { - channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); - channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); - channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey()); + + try { + createConnection(); + } + catch (ConfigurationException ex) { + // If the configuration is bad, we may as well re-throw the exception and let the process die. + throw ex; + } + catch (Exception ex) { + // Otherwise, let the process live. We can retry establishing a connection later. + log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage()); + return; + } + } + + private synchronized void createConnection() throws Exception { + if (connection != null) { + return; + } + + long currentTime = System.currentTimeMillis(); + + if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) { + log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "No attempt to consume. No connection to RMQ."); + throw new ServiceUnavailableException("Waiting for retry interval"); + } + + lastConnectionAttempt = currentTime; + + try { + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); + + if (config.getExchangeName() != null) { + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey()); + } + else { + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + } + + channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false } - else { - channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + catch (Exception ex) { + close(); + throw ex; } - - channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false + + log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event consumer successfully connected to RMQ"); } - - @Override - public Iterable<String> consumeAndCommit() throws Exception { - Iterable<String> list = consume(); - commitOffsets(); - return list; - } - - @Override - public Iterable<String> consume() throws Exception { - List<String> list = new ArrayList<>(); - MessageWithOffset record = null; - if(messageQueue.peek()!=null) { - do - { - record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); - lastDeliveryTag = record.getOffset(); - list.add(record.getMessage()); - }while(messageQueue.peek()!=null); - } - return list; - } - - @Override - public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { - List<MessageWithOffset> list = new ArrayList<>(); - MessageWithOffset record = null; - if(messageQueue.peek()!=null) { - do - { - record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); - lastDeliveryTag = record.getOffset(); - list.add(record); - }while(messageQueue.peek()!=null); - } + @Override + public Iterable<String> consumeAndCommit() throws Exception { + Iterable<String> list = consume(); + commitOffsets(); return list; - } - - @Override - public void commitOffsets() throws Exception { - if(lastDeliveryTag != null) - { - channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked - lastDeliveryTag = null; - } - } - - @Override - public void commitOffsets(long offset) throws Exception { - channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked - } - - /** - * Closes the channel - * @throws Exception - */ - public void close() throws Exception { - channel.close(); - connection.close(); - } - - class CallBackConsumer extends DefaultConsumer{ - CallBackConsumer(Channel channel) { - super(channel); - } + } + + @Override + public Iterable<String> consume() throws Exception { + if (connection == null) { + createConnection(); + } + + List<String> list = new ArrayList<>(); + MessageWithOffset record = null; + if(messageQueue.peek()!=null) { + do + { + record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); + lastDeliveryTag = record.getOffset(); + list.add(record.getMessage()); + }while(messageQueue.peek()!=null); + } + return list; + } + + @Override + public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { + if (connection == null) { + createConnection(); + } + + List<MessageWithOffset> list = new ArrayList<>(); + MessageWithOffset record = null; + if(messageQueue.peek()!=null) { + do + { + record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); + lastDeliveryTag = record.getOffset(); + list.add(record); + }while(messageQueue.peek()!=null); + } + return list; + } + + @Override + public void commitOffsets() throws Exception { + if (connection == null) { + createConnection(); + } + + if(lastDeliveryTag != null) + { + channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked + lastDeliveryTag = null; + } + } + + @Override + public void commitOffsets(long offset) throws Exception { + if (connection == null) { + createConnection(); + } + + channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked + } + + /** + * Closes the channel + * @throws Exception + */ + public void close() throws Exception { + if (channel != null) { + channel.close(); + channel = null; + } + + if (connection != null) { + connection.close(); + connection = null; + } + } + + class CallBackConsumer extends DefaultConsumer{ + CallBackConsumer(Channel channel) { + super(channel); + } @Override public void handleDelivery(String consumerTag, Envelope envelope, - AMQP.BasicProperties properties, byte[] body) throws IOException { - String message = new String(body, "UTF-8"); - try - { - MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message); - messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS); - } - catch(Exception e) - { - log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue"); - channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue - } - } - } + AMQP.BasicProperties properties, byte[] body) throws IOException { + String message = new String(body, "UTF-8"); + try + { + MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message); + messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS); + } + catch(Exception e) + { + log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue"); + channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue + } + } + } } diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java index 614ae54..a4c6105 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java @@ -24,6 +24,7 @@ package org.onap.aai.event.client; import java.util.Collection; import javax.naming.ConfigurationException; +import javax.naming.ServiceUnavailableException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; @@ -41,29 +42,45 @@ import com.rabbitmq.client.Connection; public class RabbitMqEventPublisher implements EventPublisher { private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class); - private final Connection connection; - private final Channel channel; + private Connection connection; + private Channel channel; private RabbitMqClientConfig config; + + private long lastConnectionAttempt = 0L; public RabbitMqEventPublisher(RabbitMqClientConfig config) throws Exception { this.config = config; - + if (config.getExchangeName() == null) { throw new ConfigurationException("Mandatory config param exchangeName not set"); } - - this.connection = RabbitMqUtils.createConnection(config); - this.channel = connection.createChannel(); - //Durable exchange and non delete - channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + try { + createConnection(); + } + catch (ConfigurationException ex) { + // If the configuration is bad, we may as well re-throw the exception and let the process die. + throw ex; + } + catch (Exception ex) { + // Otherwise, let the process live. We can retry establishing a connection later. + log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage()); + return; + } } @Override public void close() throws Exception { - channel.close(); - connection.close(); + if (channel != null) { + channel.close(); + channel = null; + } + + if (connection != null) { + connection.close(); + connection = null; + } } @Override @@ -88,6 +105,10 @@ public class RabbitMqEventPublisher implements EventPublisher { @Override public int sendSync(String message) throws Exception { + if (connection == null) { + createConnection(); + } + channel.basicPublish(config.getExchangeName(), config.getRoutingKey(), null, message.getBytes()); log.debug(" [x] Sent '" + message + "'"); return 1; @@ -95,6 +116,10 @@ public class RabbitMqEventPublisher implements EventPublisher { @Override public int sendSync(Collection<String> messages) throws Exception { + if (connection == null) { + createConnection(); + } + log.debug("Publishing" + messages.size() + " messages "); for (String message : messages) { sendSync(message); @@ -104,17 +129,54 @@ public class RabbitMqEventPublisher implements EventPublisher { @Override public int sendSync(String routingParam, String message) throws Exception { + if (connection == null) { + createConnection(); + } + channel.basicPublish(config.getExchangeName(), routingParam, null, message.getBytes()); return 1; } @Override public int sendSync(String routingParam, Collection<String> messages) throws Exception { + if (connection == null) { + createConnection(); + } + log.debug("Publishing" + messages.size() + " messages "); for (String message : messages) { sendSync(routingParam, message); } return messages.size(); } + + private synchronized void createConnection() throws Exception { + if (connection != null) { + return; + } + + long currentTime = System.currentTimeMillis(); + + if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) { + log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "Drop event. No connection to RMQ."); + throw new ServiceUnavailableException("Waiting for retry interval"); + } + + lastConnectionAttempt = currentTime; + + try { + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); + + //Durable exchange and non delete + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + } + catch (Exception ex) { + close(); + throw ex; + } + + log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event publisher successfully connected to RMQ"); + } } diff --git a/event-client-rabbitmq/src/main/resources/resources.properties b/event-client-rabbitmq/src/main/resources/resources.properties new file mode 100644 index 0000000..1e148d2 --- /dev/null +++ b/event-client-rabbitmq/src/main/resources/resources.properties @@ -0,0 +1,38 @@ +# +# ============LICENSE_START======================================================= +# org.onap.aai +# ================================================================================ +# Copyright © 2019 AT&T Intellectual Property. All rights reserved. +# Copyright © 2019 European Software Marketing Ltd. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# + +MESSAGE_INFO=\ + |\ + {0}|\ + |\ + |\ + +MESSAGE_ERROR=\ + |\ + {0}|\ + |\ + |\ + +MESSAGE_WARN=\ + |\ + {0}|\ + |\ + |\ |