diff options
Diffstat (limited to 'ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java')
-rw-r--r-- | ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java | 71 |
1 files changed, 34 insertions, 37 deletions
diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java b/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java index 6f76d1aa..be8e71f8 100644 --- a/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java +++ b/ecomp-sdk/epsdk-core/src/main/java/org/onap/portalsdk/core/onboarding/ueb/Consumer.java @@ -6,7 +6,7 @@ * =================================================================== * * Unless otherwise specified, all software contained herein is licensed - * under the Apache License, Version 2.0 (the “License”); + * under the Apache License, Version 2.0 (the "License"); * you may not use this software except in compliance with the License. * You may obtain a copy of the License at * @@ -19,7 +19,7 @@ * limitations under the License. * * Unless otherwise specified, all documentation contained herein is licensed - * under the Creative Commons License, Attribution 4.0 Intl. (the “License”); + * under the Creative Commons License, Attribution 4.0 Intl. (the "License"); * you may not use this documentation except in compliance with the License. * You may obtain a copy of the License at * @@ -39,9 +39,9 @@ package org.onap.portalsdk.core.onboarding.ueb; import java.io.IOException; import java.security.GeneralSecurityException; -import java.util.LinkedList; +import java.util.List; +import java.util.Queue; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,14 +60,17 @@ public class Consumer implements Runnable { private final Log logger = LogFactory.getLog(getClass()); - private final LinkedList<String> urlList = Helper.uebUrlList(); - private final ConcurrentLinkedQueue<UebMsg> queue; + private final List<String> urlList = Helper.uebUrlList(); + private final Queue<UebMsg> queue; private final WaitingRequestersQueueList waitingRequestersList; - private final String consumerKey, consumerSecret, topicName, consumerGroupName; + private final String consumerKey; + private final String consumerSecret; + private final String topicName; + private final String consumerGroupName; /** - * Accepts coordinates needed to subscribe to a UEB topic, as well as the - * queues for passing along messages that arrive. + * Accepts coordinates needed to subscribe to a UEB topic, as well as the queues + * for passing along messages that arrive. * * @param consumerKey * UEB key used to subscribe to the topic @@ -75,16 +78,18 @@ public class Consumer implements Runnable { * UEB secret used to subscribe to the topic * @param topicName * UEB topic name + * @param consumerGroupName + * consumer group name * @param queue - * Queue to receive UEB messages. All inbound messages are - * enqueued here; ignored if null. + * Queue to receive UEB messages. All inbound messages are enqueued + * here; ignored if null. * @param waitingRequestersList * Collection of queues to receive UEB messages that arrive in - * response to requests; i.e., emulating a synchronous request - * via pub/sub. + * response to requests; i.e., emulating a synchronous request via + * pub/sub. */ public Consumer(String consumerKey, String consumerSecret, String topicName, String consumerGroupName, - ConcurrentLinkedQueue<UebMsg> queue, WaitingRequestersQueueList waitingRequestersList) { + Queue<UebMsg> queue, WaitingRequestersQueueList waitingRequestersList) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.topicName = topicName; @@ -98,28 +103,21 @@ public class Consumer implements Runnable { * Distributes messages appropriately as they arrive: * <UL> * <LI>If the queue is not null, adds the message to the queue. - * <LI>If the message's getMsgId() method returns non-null and the ID is - * found in the collection of waiting requesters, adds the message in that - * requester's queue. + * <LI>If the message's getMsgId() method returns non-null and the ID is found + * in the collection of waiting requesters, adds the message in that requester's + * queue. * </UL> * - * This is intended to be called in a long running thread as a listener for - * any published messages on a topic. Typical async pub/sub model. We use a - * filter of "0" to prevent collisions with P2P messages with unique filter - * ids. + * This is intended to be called in a long running thread as a listener for any + * published messages on a topic. Typical async pub/sub model. We use a filter + * of "0" to prevent collisions with P2P messages with unique filter ids. */ protected void consume() throws IOException, UebException, GeneralSecurityException { final String id = UUID.randomUUID().toString(); - CambriaConsumer cc = null; - cc = new CambriaClientBuilders.ConsumerBuilder() - .usingHosts(urlList) - .authenticatedBy(consumerKey, consumerSecret) - .onTopic (topicName) - .knownAs (consumerGroupName,id) - .waitAtServer (15*1000) - .receivingAtMost (1000) - .build(); + CambriaConsumer cc = new CambriaClientBuilders.ConsumerBuilder().usingHosts(urlList) + .authenticatedBy(consumerKey, consumerSecret).onTopic(topicName).knownAs(consumerGroupName, id) + .waitAtServer(15 * 1000).receivingAtMost(1000).build(); while (true) { for (String msg : cc.fetch()) { @@ -133,17 +131,16 @@ public class Consumer implements Runnable { // listener queue. queue.add(uebMsg); if (logger.isDebugEnabled()) - logger.debug("Added msg to queue " + this.queue + " queue count = " + queue.size() + " msg :" - + uebMsg.getPayload()); + logger.debug("Added msg to queue " + this.queue + " msg :" + uebMsg.getPayload()); } - if (waitingRequestersList != null && uebMsg.getMsgId() != null) { + if (waitingRequestersList != null // + && uebMsg.getMsgId() != null // + && !(uebMsg.getMsgId() + .equals(PortalApiProperties.getProperty(PortalApiConstants.ECOMP_DEFAULT_MSG_ID)))) { // If a msgId is present, this could be a synchronous // reply. Here we add it to the waiting requester's // queue if we find a requester waiting for this msgId. - if (!(uebMsg.getMsgId() - .equals(PortalApiProperties.getProperty(PortalApiConstants.ECOMP_DEFAULT_MSG_ID)))) { - waitingRequestersList.addMsg(uebMsg.getMsgId(), uebMsg); - } + waitingRequestersList.addMsg(uebMsg.getMsgId(), uebMsg); } } if (Thread.interrupted()) { |