aboutsummaryrefslogtreecommitdiffstats
path: root/event-client-rabbitmq/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'event-client-rabbitmq/src/main')
-rw-r--r--event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqApplicationMsgs.java39
-rw-r--r--event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java14
-rw-r--r--event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java265
-rw-r--r--event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java82
-rw-r--r--event-client-rabbitmq/src/main/resources/resources.properties38
5 files changed, 325 insertions, 113 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqApplicationMsgs.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqApplicationMsgs.java
new file mode 100644
index 0000000..22df2b6
--- /dev/null
+++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqApplicationMsgs.java
@@ -0,0 +1,39 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2019 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event.client;
+
+import com.att.eelf.i18n.EELFResourceManager;
+import org.onap.aai.cl.eelf.LogMessageEnum;
+
+/**
+ * Logger messages
+ */
+public enum RabbitMqApplicationMsgs implements LogMessageEnum {
+
+ MESSAGE_ERROR, MESSAGE_INFO, MESSAGE_WARN;
+
+ /**
+ * Static initializer to ensure the resource bundles for this class are loaded...
+ */
+ static {
+ EELFResourceManager.loadMessageBundle("resources");
+ }
+}
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java
index d177989..108f730 100644
--- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java
+++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java
@@ -29,6 +29,7 @@ import com.rabbitmq.client.BuiltinExchangeType;
public class RabbitMqClientConfig {
private static String BINDING_CONSUME_ALL = "#";
+ private static Integer DEFAULT_RETRY_INTERVAL = 15000;
private String hosts;
private String username;
@@ -46,6 +47,7 @@ public class RabbitMqClientConfig {
private String sslKeyStorePassword;
private String sslTrustStorePassword;
private String connectionTimeout;
+ private Integer retryInterval;
private String enableSsl;
public String getUsername() {
@@ -181,6 +183,18 @@ public class RabbitMqClientConfig {
this.connectionTimeout = connectionTimeout;
}
+ public Integer getRetryInterval() {
+ if (retryInterval == null) {
+ return DEFAULT_RETRY_INTERVAL;
+ }
+
+ return retryInterval;
+ }
+
+ public void setRetryInterval(Integer retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+
public Boolean getEnableSsl() {
if (enableSsl != null) {
return Boolean.valueOf(enableSsl);
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
index c10b46c..340c68f 100644
--- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
+++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java
@@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.naming.ConfigurationException;
+import javax.naming.ServiceUnavailableException;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
@@ -51,127 +52,185 @@ import com.rabbitmq.client.AMQP;
*
*/
public class RabbitMqEventConsumer implements EventConsumer {
- private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class);
+ private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class);
- private BlockingQueue<MessageWithOffset> messageQueue;
+ private BlockingQueue<MessageWithOffset> messageQueue;
+
+ private Connection connection;
+ private Channel channel;
+ private Long lastDeliveryTag;
+ private long timeout = 5000;
+ private RabbitMqClientConfig config;
+ private long lastConnectionAttempt = 0L;
- private final Connection connection;
- private final Channel channel;
- private Long lastDeliveryTag;
- private long timeout = 5000;
- private RabbitMqClientConfig config;
-
-
/**
* (intended for testing prupose only)
* @param messageQueue
*/
- public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) {
- this.messageQueue = messageQueue;
- }
+ public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) {
+ this.messageQueue = messageQueue;
+ }
public RabbitMqEventConsumer(RabbitMqClientConfig config) throws Exception {
this.config = config;
this.messageQueue = new ArrayBlockingQueue<>(1000);
-
+
if (config.getQueue() == null) {
throw new ConfigurationException("Mandatory config param queue not set");
}
-
- this.connection = RabbitMqUtils.createConnection(config);
- this.channel = connection.createChannel();
-
- if (config.getExchangeName() != null) {
- channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
- channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
- channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey());
+
+ try {
+ createConnection();
+ }
+ catch (ConfigurationException ex) {
+ // If the configuration is bad, we may as well re-throw the exception and let the process die.
+ throw ex;
+ }
+ catch (Exception ex) {
+ // Otherwise, let the process live. We can retry establishing a connection later.
+ log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage());
+ return;
+ }
+ }
+
+ private synchronized void createConnection() throws Exception {
+ if (connection != null) {
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+
+ if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) {
+ log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "No attempt to consume. No connection to RMQ.");
+ throw new ServiceUnavailableException("Waiting for retry interval");
+ }
+
+ lastConnectionAttempt = currentTime;
+
+ try {
+ this.connection = RabbitMqUtils.createConnection(config);
+ this.channel = connection.createChannel();
+
+ if (config.getExchangeName() != null) {
+ channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
+ channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey());
+ }
+ else {
+ channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ }
+
+ channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false
}
- else {
- channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments());
+ catch (Exception ex) {
+ close();
+ throw ex;
}
-
- channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false
+
+ log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event consumer successfully connected to RMQ");
}
-
- @Override
- public Iterable<String> consumeAndCommit() throws Exception {
- Iterable<String> list = consume();
- commitOffsets();
- return list;
- }
-
- @Override
- public Iterable<String> consume() throws Exception {
- List<String> list = new ArrayList<>();
- MessageWithOffset record = null;
- if(messageQueue.peek()!=null) {
- do
- {
- record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
- lastDeliveryTag = record.getOffset();
- list.add(record.getMessage());
- }while(messageQueue.peek()!=null);
- }
- return list;
- }
-
- @Override
- public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
- List<MessageWithOffset> list = new ArrayList<>();
- MessageWithOffset record = null;
- if(messageQueue.peek()!=null) {
- do
- {
- record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
- lastDeliveryTag = record.getOffset();
- list.add(record);
- }while(messageQueue.peek()!=null);
- }
+ @Override
+ public Iterable<String> consumeAndCommit() throws Exception {
+ Iterable<String> list = consume();
+ commitOffsets();
return list;
- }
-
- @Override
- public void commitOffsets() throws Exception {
- if(lastDeliveryTag != null)
- {
- channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
- lastDeliveryTag = null;
- }
- }
-
- @Override
- public void commitOffsets(long offset) throws Exception {
- channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
- }
-
- /**
- * Closes the channel
- * @throws Exception
- */
- public void close() throws Exception {
- channel.close();
- connection.close();
- }
-
- class CallBackConsumer extends DefaultConsumer{
- CallBackConsumer(Channel channel) {
- super(channel);
- }
+ }
+
+ @Override
+ public Iterable<String> consume() throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ List<String> list = new ArrayList<>();
+ MessageWithOffset record = null;
+ if(messageQueue.peek()!=null) {
+ do
+ {
+ record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
+ lastDeliveryTag = record.getOffset();
+ list.add(record.getMessage());
+ }while(messageQueue.peek()!=null);
+ }
+ return list;
+ }
+
+ @Override
+ public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ List<MessageWithOffset> list = new ArrayList<>();
+ MessageWithOffset record = null;
+ if(messageQueue.peek()!=null) {
+ do
+ {
+ record = messageQueue.poll(1000, TimeUnit.MILLISECONDS);
+ lastDeliveryTag = record.getOffset();
+ list.add(record);
+ }while(messageQueue.peek()!=null);
+ }
+ return list;
+ }
+
+ @Override
+ public void commitOffsets() throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ if(lastDeliveryTag != null)
+ {
+ channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
+ lastDeliveryTag = null;
+ }
+ }
+
+ @Override
+ public void commitOffsets(long offset) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
+ channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked
+ }
+
+ /**
+ * Closes the channel
+ * @throws Exception
+ */
+ public void close() throws Exception {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ }
+
+ class CallBackConsumer extends DefaultConsumer{
+ CallBackConsumer(Channel channel) {
+ super(channel);
+ }
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- try
- {
- MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message);
- messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS);
- }
- catch(Exception e)
- {
- log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue");
- channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue
- }
- }
- }
+ AMQP.BasicProperties properties, byte[] body) throws IOException {
+ String message = new String(body, "UTF-8");
+ try
+ {
+ MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message);
+ messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS);
+ }
+ catch(Exception e)
+ {
+ log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue");
+ channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue
+ }
+ }
+ }
}
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java
index 614ae54..a4c6105 100644
--- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java
+++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java
@@ -24,6 +24,7 @@ package org.onap.aai.event.client;
import java.util.Collection;
import javax.naming.ConfigurationException;
+import javax.naming.ServiceUnavailableException;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
@@ -41,29 +42,45 @@ import com.rabbitmq.client.Connection;
public class RabbitMqEventPublisher implements EventPublisher {
private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class);
- private final Connection connection;
- private final Channel channel;
+ private Connection connection;
+ private Channel channel;
private RabbitMqClientConfig config;
+
+ private long lastConnectionAttempt = 0L;
public RabbitMqEventPublisher(RabbitMqClientConfig config) throws Exception {
this.config = config;
-
+
if (config.getExchangeName() == null) {
throw new ConfigurationException("Mandatory config param exchangeName not set");
}
-
- this.connection = RabbitMqUtils.createConnection(config);
- this.channel = connection.createChannel();
- //Durable exchange and non delete
- channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
+ try {
+ createConnection();
+ }
+ catch (ConfigurationException ex) {
+ // If the configuration is bad, we may as well re-throw the exception and let the process die.
+ throw ex;
+ }
+ catch (Exception ex) {
+ // Otherwise, let the process live. We can retry establishing a connection later.
+ log.error(RabbitMqApplicationMsgs.MESSAGE_ERROR, "Unable to connect to RMQ: " + ex.getMessage());
+ return;
+ }
}
@Override
public void close() throws Exception {
- channel.close();
- connection.close();
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
}
@Override
@@ -88,6 +105,10 @@ public class RabbitMqEventPublisher implements EventPublisher {
@Override
public int sendSync(String message) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
channel.basicPublish(config.getExchangeName(), config.getRoutingKey(), null, message.getBytes());
log.debug(" [x] Sent '" + message + "'");
return 1;
@@ -95,6 +116,10 @@ public class RabbitMqEventPublisher implements EventPublisher {
@Override
public int sendSync(Collection<String> messages) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
log.debug("Publishing" + messages.size() + " messages ");
for (String message : messages) {
sendSync(message);
@@ -104,17 +129,54 @@ public class RabbitMqEventPublisher implements EventPublisher {
@Override
public int sendSync(String routingParam, String message) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
channel.basicPublish(config.getExchangeName(), routingParam, null, message.getBytes());
return 1;
}
@Override
public int sendSync(String routingParam, Collection<String> messages) throws Exception {
+ if (connection == null) {
+ createConnection();
+ }
+
log.debug("Publishing" + messages.size() + " messages ");
for (String message : messages) {
sendSync(routingParam, message);
}
return messages.size();
}
+
+ private synchronized void createConnection() throws Exception {
+ if (connection != null) {
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+
+ if ( (currentTime - config.getRetryInterval()) < lastConnectionAttempt) {
+ log.warn(RabbitMqApplicationMsgs.MESSAGE_WARN, "Drop event. No connection to RMQ.");
+ throw new ServiceUnavailableException("Waiting for retry interval");
+ }
+
+ lastConnectionAttempt = currentTime;
+
+ try {
+ this.connection = RabbitMqUtils.createConnection(config);
+ this.channel = connection.createChannel();
+
+ //Durable exchange and non delete
+ channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments());
+ }
+ catch (Exception ex) {
+ close();
+ throw ex;
+ }
+
+ log.info(RabbitMqApplicationMsgs.MESSAGE_INFO, "Event publisher successfully connected to RMQ");
+ }
}
diff --git a/event-client-rabbitmq/src/main/resources/resources.properties b/event-client-rabbitmq/src/main/resources/resources.properties
new file mode 100644
index 0000000..1e148d2
--- /dev/null
+++ b/event-client-rabbitmq/src/main/resources/resources.properties
@@ -0,0 +1,38 @@
+#
+# ============LICENSE_START=======================================================
+# org.onap.aai
+# ================================================================================
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2019 European Software Marketing Ltd.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+
+MESSAGE_INFO=\
+ |\
+ {0}|\
+ |\
+ |\
+
+MESSAGE_ERROR=\
+ |\
+ {0}|\
+ |\
+ |\
+
+MESSAGE_WARN=\
+ |\
+ {0}|\
+ |\
+ |\