summaryrefslogtreecommitdiffstats
path: root/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java
diff options
context:
space:
mode:
authorTATTAVARADA <statta@research.att.com>2017-04-27 07:53:18 -0400
committerst782s <statta@research.att.com>2017-04-27 08:31:27 -0400
commit6beb446925c967aca92f5513adf36c5db77c00d6 (patch)
tree9392057ed0739de2445c5b2a2a8bee6dcdacbcf7 /ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java
parent246b225194e3e8dc1926294de591a94fd9787fa7 (diff)
[PORTAL-7] Rebase
This rebasing includes common libraries and common overlays projects abstraction of components Change-Id: Ia1efa4deacdc5701e6205104ac021a6c80ed60ba Signed-off-by: st782s <statta@research.att.com>
Diffstat (limited to 'ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java')
-rw-r--r--ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java164
1 files changed, 0 insertions, 164 deletions
diff --git a/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java b/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java
deleted file mode 100644
index bb152294..00000000
--- a/ecomp-sdk/thirdparty/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*-
- * ================================================================================
- * eCOMP Portal SDK
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ================================================================================
- */
-package org.openecomp.portalsdk.core.onboarding.ueb;
-
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.LinkedList;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.openecomp.portalsdk.core.onboarding.crossapi.PortalApiConstants;
-import org.openecomp.portalsdk.core.onboarding.crossapi.PortalApiProperties;
-
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Provides a consumer that reads messages from a UEB topic. Intended to be
- * passed to a separate thread as its runnable object.
- *
- */
-public class Consumer implements Runnable {
-
- private final Log logger = LogFactory.getLog(getClass());
-
- private final LinkedList<String> urlList = Helper.uebUrlList();
- private final ConcurrentLinkedQueue<UebMsg> queue;
- private final WaitingRequestersQueueList waitingRequestersList;
- private final String consumerKey, consumerSecret, topicName, consumerGroupName;
-
- /**
- * Accepts coordinates needed to subscribe to a UEB topic, as well as the
- * queues for passing along messages that arrive.
- *
- * @param consumerKey
- * UEB key used to subscribe to the topic
- * @param consumerSecret
- * UEB secret used to subscribe to the topic
- * @param topicName
- * UEB topic name
- * @param queue
- * Queue to receive UEB messages. All inbound messages are
- * enqueued here; ignored if null.
- * @param waitingRequestersList
- * Collection of queues to receive UEB messages that arrive in
- * response to requests; i.e., emulating a synchronous request
- * via pub/sub.
- */
- public Consumer(String consumerKey, String consumerSecret, String topicName, String consumerGroupName,
- ConcurrentLinkedQueue<UebMsg> queue, WaitingRequestersQueueList waitingRequestersList) {
- this.consumerKey = consumerKey;
- this.consumerSecret = consumerSecret;
- this.topicName = topicName;
- this.consumerGroupName = consumerGroupName;
- this.queue = queue;
- this.waitingRequestersList = waitingRequestersList;
- }
-
- /**
- * Subscribes to a topic using credentials as supplied to the constructor.
- * Distributes messages appropriately as they arrive:
- * <UL>
- * <LI>If the queue is not null, adds the message to the queue.
- * <LI>If the message's getMsgId() method returns non-null and the ID is
- * found in the collection of waiting requesters, adds the message in that
- * requester's queue.
- * </UL>
- *
- * This is intended to be called in a long running thread as a listener for
- * any published messages on a topic. Typical async pub/sub model. We use a
- * filter of "0" to prevent collisions with P2P messages with unique filter
- * ids.
- */
- protected void consume() throws IOException, UebException {
- final String id = UUID.randomUUID().toString();
-
- CambriaConsumer cc = null;
- try {
- cc = new CambriaClientBuilders.ConsumerBuilder()
- .usingHosts(urlList)
- .authenticatedBy(consumerKey, consumerSecret)
- .onTopic (topicName)
- .knownAs (consumerGroupName,id)
- .waitAtServer (15*1000)
- .receivingAtMost (1000)
- .build();
- } catch (GeneralSecurityException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- //CambriaClientFactory.createConsumer(urlList, topicName, consumerGroupName, id,
- //15 * 1000, 1000, null, consumerKey, consumerSecret);
-
- while (true) {
- for (String msg : cc.fetch()) {
- logger.debug(" <== consume from topicName " + topicName + " msg: " + msg);
- UebMsg uebMsg = new ObjectMapper().readValue(msg, UebMsg.class);
- if (queue != null) {
- // Add to general queue allowing listeners to act on any
- // incoming messages. We don't know if a listener is
- // also going to be a responder to a synchronous
- // request. So put all received messages on the general
- // listener queue.
- queue.add(uebMsg);
- if (logger.isDebugEnabled())
- logger.debug("Added msg to queue " + this.queue + " queue count = " + queue.size() + " msg :"
- + uebMsg.getPayload());
- }
- if (waitingRequestersList != null && uebMsg.getMsgId() != null) {
- // If a msgId is present, this could be a synchronous
- // reply. Here we add it to the waiting requester's
- // queue if we find a requester waiting for this msgId.
- if (!(uebMsg.getMsgId()
- .equals(PortalApiProperties.getProperty(PortalApiConstants.ECOMP_DEFAULT_MSG_ID)))) {
- waitingRequestersList.addMsg(uebMsg.getMsgId(), uebMsg);
- }
- }
- }
- if (Thread.interrupted()) {
- logger.warn(Thread.currentThread() + " interrupted, exiting");
- break;
- }
- Helper.sleep(10);
- }
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- try {
- consume();
- } catch (Exception ex) {
- Thread t = Thread.currentThread();
- t.getUncaughtExceptionHandler().uncaughtException(t, ex);
- }
- }
-
-}