aboutsummaryrefslogtreecommitdiffstats
path: root/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java')
-rw-r--r--event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java82
1 files changed, 72 insertions, 10 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java
index 614ae54..a4c6105 100644
--- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java
+++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java
@@ -24,6 +24,7 @@ package org.onap.aai.event.client;
import java.util.Collection;
import javax.naming.ConfigurationException;
+import javax.naming.ServiceUnavailableException;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
@@ -41,29 +42,45 @@ import com.rabbitmq.client.Connection;
public class RabbitMqEventPublisher implements EventPublisher {
private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class);
- private final Connection connection;
- private final Channel channel;
+ private Connection connection;
+ private Channel channel;
private RabbitMqClientConfig config;
+
+ private long lastConnectionAttempt = 0L;
public RabbitMqEventPublisher(RabbitMqClientConfig config) throws Exception {
this.config = config;
-
+
if (config.getExchangeName() == null) {
throw new ConfigurationException("Mandatory config param exchangeName not set");
}
-
- this.connection = RabbitMqUtils.createConnection(config);
- this.channel = connection.createChannel();
- //Durable exchange and non delete
- channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
+ try {
+ createConnection();
+ }
+ catch (ConfigurationException ex) {
+ // If the configuration is bad, we may as well re-throw the exception and let the process die.
+ throw ex;
+ }
+ catch (Exception ex) {
+ // Otherwise, let the process live. We can retry establishing a connection later.
+ log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage());
+ return;
+ }
}
@Override
public void close() throws Exception {
- channel.close();
- connection.close();
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
}
@Override
@@ -88,6 +105,10 @@ public class RabbitMqEventPublisher implements EventPublisher {
@Override
public int sendSync(String message) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
channel.basicPublish(config.getExchangeName(), config.getRoutingKey(), null, message.getBytes());
log.debug(" [x] Sent '" + message + "'");
return 1;
@@ -95,6 +116,10 @@ public class RabbitMqEventPublisher implements EventPublisher {
@Override
public int sendSync(Collection<String> messages) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
log.debug("Publishing" + messages.size() + " messages ");
for (String message : messages) {
sendSync(message);
@@ -104,17 +129,54 @@ public class RabbitMqEventPublisher implements EventPublisher {
@Override
public int sendSync(String routingParam, String message) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
channel.basicPublish(config.getExchangeName(), routingParam, null, message.getBytes());
return 1;
}
@Override
public int sendSync(String routingParam, Collection<String> messages) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
log.debug("Publishing" + messages.size() + " messages ");
for (String message : messages) {
sendSync(routingParam, message);
}
return messages.size();
}
+
+ private synchronized void createConnection() throws Exception {
+ if (connection != null) {
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+
+ if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) {
+ log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "Drop event. No connection to RMQ.");
+ throw new ServiceUnavailableException("Waiting for retry interval");
+ }
+
+ lastConnectionAttempt = currentTime;
+
+ try {
+ this.connection = RabbitMqUtils.createConnection(config);
+ this.channel = connection.createChannel();
+
+ //Durable exchange and non delete
+ channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
+ }
+ catch (Exception ex) {
+ close();
+ throw ex;
+ }
+
+ log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event publisher successfully connected to RMQ");
+ }
}