diff options
Diffstat (limited to 'ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java')
-rw-r--r-- | ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java | 164 |
1 files changed, 0 insertions, 164 deletions
diff --git a/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java b/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java deleted file mode 100644 index bb152294..00000000 --- a/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java +++ /dev/null @@ -1,164 +0,0 @@ -/*- - * ================================================================================ - * eCOMP Portal SDK - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ================================================================================ - */ -package org.openecomp.portalsdk.core.onboarding.ueb; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.LinkedList; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.openecomp.portalsdk.core.onboarding.crossapi.PortalApiConstants; -import org.openecomp.portalsdk.core.onboarding.crossapi.PortalApiProperties; - -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * Provides a consumer that reads messages from a UEB topic. Intended to be - * passed to a separate thread as its runnable object. - * - */ -public class Consumer implements Runnable { - - private final Log logger = LogFactory.getLog(getClass()); - - private final LinkedList<String> urlList = Helper.uebUrlList(); - private final ConcurrentLinkedQueue<UebMsg> queue; - private final WaitingRequestersQueueList waitingRequestersList; - private final String consumerKey, consumerSecret, topicName, consumerGroupName; - - /** - * Accepts coordinates needed to subscribe to a UEB topic, as well as the - * queues for passing along messages that arrive. - * - * @param consumerKey - * UEB key used to subscribe to the topic - * @param consumerSecret - * UEB secret used to subscribe to the topic - * @param topicName - * UEB topic name - * @param queue - * Queue to receive UEB messages. All inbound messages are - * enqueued here; ignored if null. - * @param waitingRequestersList - * Collection of queues to receive UEB messages that arrive in - * response to requests; i.e., emulating a synchronous request - * via pub/sub. - */ - public Consumer(String consumerKey, String consumerSecret, String topicName, String consumerGroupName, - ConcurrentLinkedQueue<UebMsg> queue, WaitingRequestersQueueList waitingRequestersList) { - this.consumerKey = consumerKey; - this.consumerSecret = consumerSecret; - this.topicName = topicName; - this.consumerGroupName = consumerGroupName; - this.queue = queue; - this.waitingRequestersList = waitingRequestersList; - } - - /** - * Subscribes to a topic using credentials as supplied to the constructor. - * Distributes messages appropriately as they arrive: - * <UL> - * <LI>If the queue is not null, adds the message to the queue. - * <LI>If the message's getMsgId() method returns non-null and the ID is - * found in the collection of waiting requesters, adds the message in that - * requester's queue. - * </UL> - * - * This is intended to be called in a long running thread as a listener for - * any published messages on a topic. Typical async pub/sub model. We use a - * filter of "0" to prevent collisions with P2P messages with unique filter - * ids. - */ - protected void consume() throws IOException, UebException { - final String id = UUID.randomUUID().toString(); - - CambriaConsumer cc = null; - try { - cc = new CambriaClientBuilders.ConsumerBuilder() - .usingHosts(urlList) - .authenticatedBy(consumerKey, consumerSecret) - .onTopic (topicName) - .knownAs (consumerGroupName,id) - .waitAtServer (15*1000) - .receivingAtMost (1000) - .build(); - } catch (GeneralSecurityException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - //CambriaClientFactory.createConsumer(urlList, topicName, consumerGroupName, id, - //15 * 1000, 1000, null, consumerKey, consumerSecret); - - while (true) { - for (String msg : cc.fetch()) { - logger.debug(" <== consume from topicName " + topicName + " msg: " + msg); - UebMsg uebMsg = new ObjectMapper().readValue(msg, UebMsg.class); - if (queue != null) { - // Add to general queue allowing listeners to act on any - // incoming messages. We don't know if a listener is - // also going to be a responder to a synchronous - // request. So put all received messages on the general - // listener queue. - queue.add(uebMsg); - if (logger.isDebugEnabled()) - logger.debug("Added msg to queue " + this.queue + " queue count = " + queue.size() + " msg :" - + uebMsg.getPayload()); - } - if (waitingRequestersList != null && uebMsg.getMsgId() != null) { - // If a msgId is present, this could be a synchronous - // reply. Here we add it to the waiting requester's - // queue if we find a requester waiting for this msgId. - if (!(uebMsg.getMsgId() - .equals(PortalApiProperties.getProperty(PortalApiConstants.ECOMP_DEFAULT_MSG_ID)))) { - waitingRequestersList.addMsg(uebMsg.getMsgId(), uebMsg); - } - } - } - if (Thread.interrupted()) { - logger.warn(Thread.currentThread() + " interrupted, exiting"); - break; - } - Helper.sleep(10); - } - - } - - /* - * (non-Javadoc) - * - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - try { - consume(); - } catch (Exception ex) { - Thread t = Thread.currentThread(); - t.getUncaughtExceptionHandler().uncaughtException(t, ex); - } - } - -} |