summaryrefslogtreecommitdiffstats
path: root/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java')
-rw-r--r--ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java71
1 files changed, 34 insertions, 37 deletions
diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java b/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java
index 6f76d1aa..be8e71f8 100644
--- a/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java
+++ b/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java
@@ -6,7 +6,7 @@
* ===================================================================
*
* Unless otherwise specified, all software contained herein is licensed
- * under the Apache License, Version 2.0 (the “License”);
+ * under the Apache License, Version 2.0 (the "License");
* you may not use this software except in compliance with the License.
* You may obtain a copy of the License at
*
@@ -19,7 +19,7 @@
* limitations under the License.
*
* Unless otherwise specified, all documentation contained herein is licensed
- * under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+ * under the Creative Commons License, Attribution 4.0 Intl. (the "License");
* you may not use this documentation except in compliance with the License.
* You may obtain a copy of the License at
*
@@ -39,9 +39,9 @@ package org.onap.portalsdk.core.onboarding.ueb;
import java.io.IOException;
import java.security.GeneralSecurityException;
-import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,14 +60,17 @@ public class Consumer implements Runnable {
private final Log logger = LogFactory.getLog(getClass());
- private final LinkedList<String> urlList = Helper.uebUrlList();
- private final ConcurrentLinkedQueue<UebMsg> queue;
+ private final List<String> urlList = Helper.uebUrlList();
+ private final Queue<UebMsg> queue;
private final WaitingRequestersQueueList waitingRequestersList;
- private final String consumerKey, consumerSecret, topicName, consumerGroupName;
+ private final String consumerKey;
+ private final String consumerSecret;
+ private final String topicName;
+ private final String consumerGroupName;
/**
- * Accepts coordinates needed to subscribe to a UEB topic, as well as the
- * queues for passing along messages that arrive.
+ * Accepts coordinates needed to subscribe to a UEB topic, as well as the queues
+ * for passing along messages that arrive.
*
* @param consumerKey
* UEB key used to subscribe to the topic
@@ -75,16 +78,18 @@ public class Consumer implements Runnable {
* UEB secret used to subscribe to the topic
* @param topicName
* UEB topic name
+ * @param consumerGroupName
+ * consumer group name
* @param queue
- * Queue to receive UEB messages. All inbound messages are
- * enqueued here; ignored if null.
+ * Queue to receive UEB messages. All inbound messages are enqueued
+ * here; ignored if null.
* @param waitingRequestersList
* Collection of queues to receive UEB messages that arrive in
- * response to requests; i.e., emulating a synchronous request
- * via pub/sub.
+ * response to requests; i.e., emulating a synchronous request via
+ * pub/sub.
*/
public Consumer(String consumerKey, String consumerSecret, String topicName, String consumerGroupName,
- ConcurrentLinkedQueue<UebMsg> queue, WaitingRequestersQueueList waitingRequestersList) {
+ Queue<UebMsg> queue, WaitingRequestersQueueList waitingRequestersList) {
this.consumerKey = consumerKey;
this.consumerSecret = consumerSecret;
this.topicName = topicName;
@@ -98,28 +103,21 @@ public class Consumer implements Runnable {
* Distributes messages appropriately as they arrive:
* <UL>
* <LI>If the queue is not null, adds the message to the queue.
- * <LI>If the message's getMsgId() method returns non-null and the ID is
- * found in the collection of waiting requesters, adds the message in that
- * requester's queue.
+ * <LI>If the message's getMsgId() method returns non-null and the ID is found
+ * in the collection of waiting requesters, adds the message in that requester's
+ * queue.
* </UL>
*
- * This is intended to be called in a long running thread as a listener for
- * any published messages on a topic. Typical async pub/sub model. We use a
- * filter of "0" to prevent collisions with P2P messages with unique filter
- * ids.
+ * This is intended to be called in a long running thread as a listener for any
+ * published messages on a topic. Typical async pub/sub model. We use a filter
+ * of "0" to prevent collisions with P2P messages with unique filter ids.
*/
protected void consume() throws IOException, UebException, GeneralSecurityException {
final String id = UUID.randomUUID().toString();
- CambriaConsumer cc = null;
- cc = new CambriaClientBuilders.ConsumerBuilder()
- .usingHosts(urlList)
- .authenticatedBy(consumerKey, consumerSecret)
- .onTopic (topicName)
- .knownAs (consumerGroupName,id)
- .waitAtServer (15*1000)
- .receivingAtMost (1000)
- .build();
+ CambriaConsumer cc = new CambriaClientBuilders.ConsumerBuilder().usingHosts(urlList)
+ .authenticatedBy(consumerKey, consumerSecret).onTopic(topicName).knownAs(consumerGroupName, id)
+ .waitAtServer(15 * 1000).receivingAtMost(1000).build();
while (true) {
for (String msg : cc.fetch()) {
@@ -133,17 +131,16 @@ public class Consumer implements Runnable {
// listener queue.
queue.add(uebMsg);
if (logger.isDebugEnabled())
- logger.debug("Added msg to queue " + this.queue + " queue count = " + queue.size() + " msg :"
- + uebMsg.getPayload());
+ logger.debug("Added msg to queue " + this.queue + " msg :" + uebMsg.getPayload());
}
- if (waitingRequestersList != null && uebMsg.getMsgId() != null) {
+ if (waitingRequestersList != null //
+ && uebMsg.getMsgId() != null //
+ && !(uebMsg.getMsgId()
+ .equals(PortalApiProperties.getProperty(PortalApiConstants.ECOMP_DEFAULT_MSG_ID)))) {
// If a msgId is present, this could be a synchronous
// reply. Here we add it to the waiting requester's
// queue if we find a requester waiting for this msgId.
- if (!(uebMsg.getMsgId()
- .equals(PortalApiProperties.getProperty(PortalApiConstants.ECOMP_DEFAULT_MSG_ID)))) {
- waitingRequestersList.addMsg(uebMsg.getMsgId(), uebMsg);
- }
+ waitingRequestersList.addMsg(uebMsg.getMsgId(), uebMsg);
}
}
if (Thread.interrupted()) {