diff options
Diffstat (limited to 'event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java')
-rw-r--r-- | event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java | 82 |
1 files changed, 72 insertions, 10 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java index 614ae54..a4c6105 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java @@ -24,6 +24,7 @@ package org.onap.aai.event.client; import java.util.Collection; import javax.naming.ConfigurationException; +import javax.naming.ServiceUnavailableException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; @@ -41,29 +42,45 @@ import com.rabbitmq.client.Connection; public class RabbitMqEventPublisher implements EventPublisher { private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class); - private final Connection connection; - private final Channel channel; + private Connection connection; + private Channel channel; private RabbitMqClientConfig config; + + private long lastConnectionAttempt = 0L; public RabbitMqEventPublisher(RabbitMqClientConfig config) throws Exception { this.config = config; - + if (config.getExchangeName() == null) { throw new ConfigurationException("Mandatory config param exchangeName not set"); } - - this.connection = RabbitMqUtils.createConnection(config); - this.channel = connection.createChannel(); - //Durable exchange and non delete - channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + try { + createConnection(); + } + catch (ConfigurationException ex) { + // If the configuration is bad, we may as well re-throw the exception and let the process die. + throw ex; + } + catch (Exception ex) { + // Otherwise, let the process live. We can retry establishing a connection later. + log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage()); + return; + } } @Override public void close() throws Exception { - channel.close(); - connection.close(); + if (channel != null) { + channel.close(); + channel = null; + } + + if (connection != null) { + connection.close(); + connection = null; + } } @Override @@ -88,6 +105,10 @@ public class RabbitMqEventPublisher implements EventPublisher { @Override public int sendSync(String message) throws Exception { + if (connection == null) { + createConnection(); + } + channel.basicPublish(config.getExchangeName(), config.getRoutingKey(), null, message.getBytes()); log.debug(" [x] Sent '" + message + "'"); return 1; @@ -95,6 +116,10 @@ public class RabbitMqEventPublisher implements EventPublisher { @Override public int sendSync(Collection<String> messages) throws Exception { + if (connection == null) { + createConnection(); + } + log.debug("Publishing" + messages.size() + " messages "); for (String message : messages) { sendSync(message); @@ -104,17 +129,54 @@ public class RabbitMqEventPublisher implements EventPublisher { @Override public int sendSync(String routingParam, String message) throws Exception { + if (connection == null) { + createConnection(); + } + channel.basicPublish(config.getExchangeName(), routingParam, null, message.getBytes()); return 1; } @Override public int sendSync(String routingParam, Collection<String> messages) throws Exception { + if (connection == null) { + createConnection(); + } + log.debug("Publishing" + messages.size() + " messages "); for (String message : messages) { sendSync(routingParam, message); } return messages.size(); } + + private synchronized void createConnection() throws Exception { + if (connection != null) { + return; + } + + long currentTime = System.currentTimeMillis(); + + if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) { + log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "Drop event. No connection to RMQ."); + throw new ServiceUnavailableException("Waiting for retry interval"); + } + + lastConnectionAttempt = currentTime; + + try { + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); + + //Durable exchange and non delete + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + } + catch (Exception ex) { + close(); + throw ex; + } + + log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event publisher successfully connected to RMQ"); + } } |