From 6beb446925c967aca92f5513adf36c5db77c00d6 Mon Sep 17 00:00:00 2001 From: TATTAVARADA Date: Thu, 27 Apr 2017 07:53:18 -0400 Subject: [PORTAL-7] Rebase This rebasing includes common libraries and common overlays projects abstraction of components Change-Id: Ia1efa4deacdc5701e6205104ac021a6c80ed60ba Signed-off-by: st782s --- .../portalsdk/core/onboarding/ueb/Consumer.java | 155 +++++++++ .../core/onboarding/ueb/FunctionalMenu.java | 61 ++++ .../portalsdk/core/onboarding/ueb/Helper.java | 64 ++++ .../portalsdk/core/onboarding/ueb/Publisher.java | 125 +++++++ .../core/onboarding/ueb/PublisherList.java | 77 +++++ .../core/onboarding/ueb/TopicManager.java | 135 ++++++++ .../core/onboarding/ueb/UebException.java | 65 ++++ .../portalsdk/core/onboarding/ueb/UebManager.java | 358 +++++++++++++++++++++ .../portalsdk/core/onboarding/ueb/UebMsg.java | 119 +++++++ .../portalsdk/core/onboarding/ueb/UebMsgTypes.java | 28 ++ .../onboarding/ueb/WaitingRequestersQueueList.java | 73 +++++ 11 files changed, 1260 insertions(+) create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/FunctionalMenu.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Helper.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/PublisherList.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/TopicManager.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebException.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebManager.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsg.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsgTypes.java create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/WaitingRequestersQueueList.java (limited to 'ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb') diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java new file mode 100644 index 00000000..e9921d18 --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Consumer.java @@ -0,0 +1,155 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.LinkedList; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiConstants; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiProperties; + +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaConsumer; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Provides a consumer that reads messages from a UEB topic. Intended to be + * passed to a separate thread as its runnable object. + */ +public class Consumer implements Runnable { + + private final Log logger = LogFactory.getLog(getClass()); + + private final LinkedList urlList = Helper.uebUrlList(); + private final ConcurrentLinkedQueue queue; + private final WaitingRequestersQueueList waitingRequestersList; + private final String consumerKey, consumerSecret, topicName, consumerGroupName; + + /** + * Accepts coordinates needed to subscribe to a UEB topic, as well as the + * queues for passing along messages that arrive. + * + * @param consumerKey + * UEB key used to subscribe to the topic + * @param consumerSecret + * UEB secret used to subscribe to the topic + * @param topicName + * UEB topic name + * @param queue + * Queue to receive UEB messages. All inbound messages are + * enqueued here; ignored if null. + * @param waitingRequestersList + * Collection of queues to receive UEB messages that arrive in + * response to requests; i.e., emulating a synchronous request + * via pub/sub. + */ + public Consumer(String consumerKey, String consumerSecret, String topicName, String consumerGroupName, + ConcurrentLinkedQueue queue, WaitingRequestersQueueList waitingRequestersList) { + this.consumerKey = consumerKey; + this.consumerSecret = consumerSecret; + this.topicName = topicName; + this.consumerGroupName = consumerGroupName; + this.queue = queue; + this.waitingRequestersList = waitingRequestersList; + } + + /** + * Subscribes to a topic using credentials as supplied to the constructor. + * Distributes messages appropriately as they arrive: + *
    + *
  • If the queue is not null, adds the message to the queue. + *
  • If the message's getMsgId() method returns non-null and the ID is + * found in the collection of waiting requesters, adds the message in that + * requester's queue. + *
+ * + * This is intended to be called in a long running thread as a listener for + * any published messages on a topic. Typical async pub/sub model. We use a + * filter of "0" to prevent collisions with P2P messages with unique filter + * ids. + */ + protected void consume() throws IOException, UebException, GeneralSecurityException { + final String id = UUID.randomUUID().toString(); + + CambriaConsumer cc = null; + cc = new CambriaClientBuilders.ConsumerBuilder() + .usingHosts(urlList) + .authenticatedBy(consumerKey, consumerSecret) + .onTopic (topicName) + .knownAs (consumerGroupName,id) + .waitAtServer (15*1000) + .receivingAtMost (1000) + .build(); + + while (true) { + for (String msg : cc.fetch()) { + logger.debug(" <== consume from topicName " + topicName + " msg: " + msg); + UebMsg uebMsg = new ObjectMapper().readValue(msg, UebMsg.class); + if (queue != null) { + // Add to general queue allowing listeners to act on any + // incoming messages. We don't know if a listener is + // also going to be a responder to a synchronous + // request. So put all received messages on the general + // listener queue. + queue.add(uebMsg); + if (logger.isDebugEnabled()) + logger.debug("Added msg to queue " + this.queue + " queue count = " + queue.size() + " msg :" + + uebMsg.getPayload()); + } + if (waitingRequestersList != null && uebMsg.getMsgId() != null) { + // If a msgId is present, this could be a synchronous + // reply. Here we add it to the waiting requester's + // queue if we find a requester waiting for this msgId. + if (!(uebMsg.getMsgId() + .equals(PortalApiProperties.getProperty(PortalApiConstants.ECOMP_DEFAULT_MSG_ID)))) { + waitingRequestersList.addMsg(uebMsg.getMsgId(), uebMsg); + } + } + } + if (Thread.interrupted()) { + logger.warn(Thread.currentThread() + " interrupted, exiting"); + break; + } + Helper.sleep(10); + } + + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + try { + consume(); + } catch (Exception ex) { + Thread t = Thread.currentThread(); + t.getUncaughtExceptionHandler().uncaughtException(t, ex); + } + } + +} \ No newline at end of file diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/FunctionalMenu.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/FunctionalMenu.java new file mode 100644 index 00000000..7cf3e91c --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/FunctionalMenu.java @@ -0,0 +1,61 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Provides a convenience method for fetching the functional menu for a user + * from the ECOMP Portal via UEB. + */ +public class FunctionalMenu { + + private static final Log logger = LogFactory.getLog(FunctionalMenu.class); + + /** + * Makes a synchronous call to ECOMP Portal to get JSON with the functional + * menu, which arrives as the payload of the returned UEB message. + * + * @param userId + * User ID as known on the ECOMP Portal for customizing the + * functional menu appropriately + * @return JSON with functional menu + * @throws UebException + */ + public static String get(String userId) throws UebException { + String returnString = null; + logger.info("Making use of UEB communication and Requesting functional menu for user " + userId); + UebMsg funcMenuUebMsg = null; + UebMsg msg = new UebMsg(); + msg.putMsgType(UebMsgTypes.UEB_MSG_TYPE_GET_FUNC_MENU); + msg.putUserId(userId); + funcMenuUebMsg = UebManager.getInstance().requestReply(msg); + if (funcMenuUebMsg != null) { + if (funcMenuUebMsg.getPayload().startsWith("Error:")) { + logger.error("getFunctionalMenu received an error in UEB msg = " + funcMenuUebMsg.getPayload()); + } else { + returnString = funcMenuUebMsg.getPayload(); + } + } + return returnString; + } + +} \ No newline at end of file diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Helper.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Helper.java new file mode 100644 index 00000000..6e6b4ae8 --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Helper.java @@ -0,0 +1,64 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiConstants; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiProperties; + +/** + * Provides utility methods. + */ +public class Helper { + + private static final Log logger = LogFactory.getLog(Helper.class); + + /** + * Parses a comma-separated list of UEB servers from properties file into a + * list. + * + * @return List of UEB server names + */ + public static LinkedList uebUrlList() { + LinkedList urlList = null; + String url = PortalApiProperties.getProperty(PortalApiConstants.UEB_URL_LIST); + if (url == null) { + logger.error("uebUrlList: failed to get property " + PortalApiConstants.UEB_URL_LIST); + return null; + } + urlList = new LinkedList(); + for (String u : url.split(",")) { + urlList.add(u.trim()); + } + return urlList; + } + + public static void sleep(int milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +} diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java new file mode 100644 index 00000000..8dbdda2b --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java @@ -0,0 +1,125 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiConstants; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + +/** + * Provides a publisher that sends messages to a UEB topic. + * + * Utilizes AT&T's UEB/Cambria subscriber/publisher messaging service. + */ +public class Publisher { + + private final Log logger = LogFactory.getLog(getClass()); + + protected final LinkedList urlList = Helper.uebUrlList(); + + private final String topicName; + private final String publisherKey; + private final String publisherSecret; + + /** + * Accepts coordinates needed to publish to a UEB topic. + * + * @param publisherKey + * UEB key used to publish to the topic + * @param publisherSecret + * UEB secret used to publish to the topic + * @param topicName + * UEB topic name + */ + public Publisher(String publisherKey, String publisherSecret, String topicName) { + this.publisherKey = publisherKey; + this.publisherSecret = publisherSecret; + this.topicName = topicName; + logger.info("Publisher instantiated for topic " + topicName); + } + + /** + * Creates a publisher, subscribes to the topic, sends the specified message + * to the topic, then closes the publisher. This ensures that the single + * message goes immediately. UEB is designed for high throughput and tries + * to batch up multiple messages in each send, but this method wants the + * single message to go immediately. + * + * @param uebMsg + * Message object to send as the payload. + * @throws UebException + * If anything goes wrong, including JSON serialization of the + * specified message object. + */ + public void send(UebMsg uebMsg) throws UebException { + String msg = null; + + CambriaBatchingPublisher pub; + try { + pub = new CambriaClientBuilders.PublisherBuilder() + .authenticatedBy(publisherKey, publisherSecret).usingHosts(urlList).onTopic(topicName).build(); + } catch (MalformedURLException e1) { + logger.error("pub.build Exception ", e1); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, e1, topicName, null, msg); + } catch (GeneralSecurityException e1) { + logger.error("pub.build Exception ", e1); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, e1, topicName, null, msg); + } + + try { + ObjectWriter mapper = new ObjectMapper().writer().withDefaultPrettyPrinter(); + msg = mapper.writeValueAsString(uebMsg); + } catch (JsonProcessingException e) { + throw new UebException(PortalApiConstants.ECOMP_UEB_INVALID_MSG, topicName, null, null); + } + + try { + logger.debug("Publishing to " + topicName + " msg: " + msg); + int NumSent = pub.send(PortalApiConstants.ECOMP_GENERAL_UEB_PARTITION, msg); + if (NumSent == 0) { + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, topicName, null, msg); + } + } catch (IOException ex) { + logger.error("Failed to publish", ex); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, ex, topicName, null, msg); + } + + try { + // close the publisher to make sure everything's sent before exiting + pub.close(5, TimeUnit.SECONDS); + } catch (Exception ex) { + logger.error("pub.close Exception ", ex); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, ex, topicName, null, msg); + } + + } +} \ No newline at end of file diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/PublisherList.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/PublisherList.java new file mode 100644 index 00000000..6919236f --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/PublisherList.java @@ -0,0 +1,77 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A thin wrapper around ConcurrentHashMap that stores a queue for each + * Requester that is waiting for a Reply. When a reply is received that has a + * matching msgId, that requesters queue is populated with the reply message. + * + * Primarily for Portal core to track the remote applications that have placed + * requests; never used by those applications. + */ +public class PublisherList { + + private final Log logger = LogFactory.getLog(getClass()); + + private final Map map; + + public PublisherList() { + map = new ConcurrentHashMap<>(); + } + + public void addPublisherToMap(String topicName, Publisher publisher) { + if (this.map.containsKey(topicName)) { + logger.error("Publisher already exists for " + topicName); + } else { + this.map.put(topicName, publisher); + } + } + + public Publisher getPublisher(String topicName) { + return this.map.get(topicName); + } + + public void removePublisherFromMap(String topicName) { + this.map.remove(topicName); + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("Map contains " + this.map.size() + " Publishers."); + for (Map.Entry entry : this.map.entrySet()) { + String key = entry.getKey().toString(); + Publisher pub = entry.getValue(); + sb.append("Entry msgId, " + key + " publisher" + pub); + } + return sb.toString(); + } + + public int size() { + return this.map.size(); + } + +} \ No newline at end of file diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/TopicManager.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/TopicManager.java new file mode 100644 index 00000000..8593b5bb --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/TopicManager.java @@ -0,0 +1,135 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiConstants; + +import com.att.nsa.apiClient.http.HttpClient; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.cambria.client.CambriaClient.CambriaApiException; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientFactory; +import com.att.nsa.cambria.client.CambriaTopicManager; + +/** + * Provides methods to facilitate creating topics, and adding publishers and + * subscribers to existing topics. + * + * Utilizes UEB/Cambria subscriber/publisher messaging service. + */ +public class TopicManager { + + private final Log logger = LogFactory.getLog(getClass()); + + /** + * Creates a topic with the specified information. + * + * @param key + * Topic key + * @param secret + * Topic secret key + * @param topicName + * Topic name + * @param topicDescription + * Topic description + * @throws HttpException + * @throws CambriaApiException + * @throws IOException + */ + public void createTopic(String key, String secret, String topicName, String topicDescription) + throws HttpException, CambriaApiException, IOException { + final LinkedList urlList = Helper.uebUrlList(); + if (logger.isInfoEnabled()) { + logger.info("==> createTopic"); + logger.info("topicName: " + topicName); + logger.info("topicDescription: " + topicDescription); + } + CambriaTopicManager tm =null; + try { + tm = CambriaClientFactory.createTopicManager(null, urlList, key, secret); + } catch (GeneralSecurityException e) { + logger.error("pub.build Exception ", e); + throw new CambriaApiException(topicName); + } + tm.createTopic(topicName, topicDescription, 1, 1); + } + + /** + * Modifies the specified topic to accept a subscriber using the specified + * key. + * + * @param topicOwnerKey + * @param topicOwnerSecret + * @param subscriberKey + * @param topicName + * @throws HttpException + * @throws CambriaApiException + * @throws IOException + */ + public void addSubscriber(String topicOwnerKey, String topicOwnerSecret, String subscriberKey, String topicName) + throws HttpException, CambriaApiException, IOException { + logger.info("==> addSubscriber to topic " + topicName); + final LinkedList urlList = Helper.uebUrlList(); + CambriaTopicManager tm = null; + try { + tm = new CambriaClientBuilders.TopicManagerBuilder().usingHosts(urlList).authenticatedBy(topicOwnerKey, topicOwnerSecret).build(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + //old version num : CambriaClientFactory.createTopicManager(urlList, topicOwnerKey,topicOwnerSecret); + tm.allowConsumer(topicName, subscriberKey); + } + + /** + * Modifies the specified topic to accept a publisher using the specified + * key. + * + * @param topicOwnerKey + * @param topicOwnerSecret + * @param publisherKey + * @param topicName + * @throws HttpException + * @throws CambriaApiException + * @throws IOException + */ + + @SuppressWarnings("deprecation") + public void addPublisher(String topicOwnerKey, String topicOwnerSecret, String publisherKey, String topicName) + throws HttpException, CambriaApiException, IOException { + logger.info("==> addPublisher to topic " + topicName); + final LinkedList urlList = Helper.uebUrlList(); + CambriaTopicManager tm =null; + try { + tm = CambriaClientFactory.createTopicManager(HttpClient.ConnectionType.HTTPS, urlList, topicOwnerKey, + topicOwnerSecret); + } catch (GeneralSecurityException e) { + logger.error("pub.build Exception ", e); + throw new CambriaApiException(topicName); + } + tm.allowProducer(topicName, publisherKey); + } +} diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebException.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebException.java new file mode 100644 index 00000000..875ba58d --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebException.java @@ -0,0 +1,65 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +/** + * Stores UEB-specific information including topic, message ID and message body. + */ +public class UebException extends Exception { + + private static final long serialVersionUID = 1L; + private String topicName = null; + private String msgId = null; + private String msg = null; + + public UebException(String errorMsg, String topicName, String msgId, String msg) { + super(errorMsg); + this.topicName = topicName; + this.msgId = msgId; + this.msg = msg; + } + + public UebException(String errorMsg, Throwable ex, String topicName, String msgId, String msg) { + super(errorMsg, ex); + this.topicName = topicName; + this.msgId = msgId; + this.msg = msg; + } + + public UebException(String msg, Throwable ex) { + super(msg, ex); + } + + public UebException(Throwable ex) { + super(ex); + } + + public String getUebMsg() { + return this.msg; + } + + public String getTopicName() { + return this.topicName; + } + + public String getMsgId() { + return this.msgId; + } +} diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebManager.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebManager.java new file mode 100644 index 00000000..7bf6de35 --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebManager.java @@ -0,0 +1,358 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiConstants; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiProperties; + +/** + * Manages UEB interactions and provides methods for publishing requests, + * replies and others. + */ +public class UebManager { + + private final Log logger = LogFactory.getLog(getClass()); + + private WaitingRequestersQueueList waitingRequestersQueueList; + private PublisherList publisherList = new PublisherList(); + private static UebManager uebManager = null; + + private final String inTopicName; + private final String consumerGroupName; + private final String outTopicName; + private final String appUebKey; + private final String appUebSecret; + + private Publisher appPublisher; + private Thread listenerThread; + private boolean bThisIsEcompPortalServer = false; + + /** + * Constructor initializes fields and validates values obtained from + * properties. + * + * The picture below is a simplified view of the relationships among ECOMP + * Portal and applications communicating via UEB: + * + *
+	*                      ECOMP out to many.
+	*                      App out to only ECOMP.
+	*
+	*  |----------------|<---------------------------------------------   
+	*  |                |                                         | |  |
+	*  |                |---------------------------> App 1 ------  |  |
+	*  |  ECOMP Portal  |---------------------------> App 2 ---------  |
+	*  |                |                            ...               |
+	*  |                |---------------------------> App n -----------
+	*  |----------------|
+	 * 
+ * + * @throws IOException + */ + protected UebManager() throws UebException { + waitingRequestersQueueList = null; + listenerThread = null; + outTopicName = PortalApiProperties.getProperty(PortalApiConstants.ECOMP_PORTAL_INBOX_NAME); + inTopicName = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_INBOUND_MAILBOX_NAME); + appUebKey = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_KEY); + appUebSecret = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_SECRET); + String consGrp = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_CONSUMER_GROUP_NAME); + + if (outTopicName == null || outTopicName.length() == 0) + throw new UebException("Failed to get property " + PortalApiConstants.ECOMP_PORTAL_INBOX_NAME, null, null, + null); + if (inTopicName == null || inTopicName.length() == 0) + throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_INBOUND_MAILBOX_NAME, null, + null, null); + if (consGrp == null || consGrp.length() == 0) + throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_CONSUMER_GROUP_NAME, null, + null, null); + if (appUebKey == null || appUebKey.length() == 0) + throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_KEY, null, null, null); + if (appUebSecret == null || appUebSecret.length() == 0) + throw new UebException("Failed to get property " + PortalApiConstants.UEB_APP_SECRET, null, null, null); + List uebUrlList = Helper.uebUrlList(); + if (uebUrlList == null || uebUrlList.size() == 0) + throw new UebException("Failed to get property" + PortalApiConstants.UEB_URL_LIST, null, null, null); + // A bit of magic: if consumer group is a magic token, generate one. + consumerGroupName = (PortalApiConstants.UEB_APP_CONSUMER_GROUP_NAME_GENERATOR.equals(consGrp) + ? UUID.randomUUID().toString() : consGrp); + } + + /** + * Gets the static instance, creating it if necessary. + * + * @return Instance of UebManager + * @throws IOException + */ + public static synchronized UebManager getInstance() throws UebException { + if (uebManager == null) { + uebManager = new UebManager(); + } + return uebManager; + } + + /** + * Answers whether the getInstance() method has previously been called. + * + * @return True if a static instance is available, else false. + */ + public static boolean isInstanceAvailable() { + return uebManager != null; + } + + /** + * Creates a list of waiting requesters, creates and a consumer using cached + * information, and starts a new thread to run the consumer that listens for + * messages published to the inbound topic configured in the constructor. + * + * @param inboxQueue + * Queue supplied to the consumer. If not null, the consumer will + * enqueue every message it receives. + */ + public void initListener(ConcurrentLinkedQueue inboxQueue) throws UebException { + waitingRequestersQueueList = new WaitingRequestersQueueList(); + Consumer runnable = new Consumer(appUebKey, appUebSecret, inTopicName, consumerGroupName, inboxQueue, + waitingRequestersQueueList); + this.listenerThread = new Thread(runnable, "UEBConsumerThread"); + this.listenerThread.start(); + Helper.sleep(400); // UEB functions more reliably when we give this some + // time + + logger.info("UEBManager instance starting... " + inTopicName + " listener thread " + + this.listenerThread.getName() + " state = " + this.listenerThread.getState()); + + /* + * ECOMP Portal manages a dynamic list of outbound topics and so the + * outTopicName is initialized in this logic with the same value as the + * inbound topic. The real outbound topics name will be added to the + * publisher list for ECOMP Portal. For an SDK/App instance only one + * publisher is needed, appPublisher. + */ + if (inTopicName.equalsIgnoreCase(outTopicName)) { + this.bThisIsEcompPortalServer = true; + } else { + appPublisher = new Publisher(appUebKey, appUebSecret, outTopicName); + Helper.sleep(400); + } + } + + /** + * Creates and adds a publisher to the list for the specified topic. This + * should only be called by the ECOMP Portal App, other Apps have just one + * publisher and use appPublisher + * + * @param topicName + */ + public void addPublisher(String topicName) { + logger.info("UEBManager adding publisher for " + topicName); + Publisher outBoxToAppPublisher = new Publisher(appUebKey, appUebSecret, topicName); + publisherList.addPublisherToMap(topicName, outBoxToAppPublisher); + } + + /** + * Removes a publisher from the list for the specified topic. + * + * This should only be called by the ECOMP Portal App, other Apps have just + * one publisher and use appPublisher + * + * @param topicName + */ + public void removePublisher(String topicName) { + logger.info("UEBManager removing publisher for " + topicName); + publisherList.removePublisherFromMap(topicName); + } + + /** + * Adds the default ECOMP message ID to the message and sends the message to + * the topic. + * + * @param msg + * @throws UebException + */ + public void publish(UebMsg msg) throws UebException { + msg.putMsgId(PortalApiConstants.ECOMP_DEFAULT_MSG_ID); + appPublisher.send(msg); + } + + /** + * Sends the message using the default publisher. + * + * @param msg + * @throws UebException + */ + public void publishReply(UebMsg msg) throws UebException { + // Caller populates msgId with the echoed value from the request + appPublisher.send(msg); + } + + /** + * Sends the message using the appropriate publisher for the specified + * topic. + * + * @param msg + * @param topicName + * @throws UebException + */ + public void publishEP(UebMsg msg, String topicName) throws UebException { + Publisher publisher = publisherList.getPublisher(topicName); + if (publisher != null) { + msg.putMsgId(PortalApiConstants.ECOMP_DEFAULT_MSG_ID); + publisher.send(msg); + } + } + + /** + * Publishes a reply using the appropriate publisher for the specified + * topic. + * + * @param msg + * @param topicName + * @throws UebException + */ + public void publishReplyEP(UebMsg msg, String topicName) throws UebException { + // Caller populates msgId with the echoed value from the request + Publisher publisher = publisherList.getPublisher(topicName); + if (publisher != null) { + publisher.send(msg); + } + } + + /** + * Sends the specified message using the specified publisher, and waits for + * a reply. Retransmits if no reply is received in 5 seconds; gives up after + * 3 retries. + * + * @param msg + * @param publisher + * @return Message from a remote publisher, or null if timeout happens. + * @throws UebException + */ + public UebMsg requestReplyUsingPublisher(UebMsg msg, Publisher publisher) throws UebException { + UebMsg reply = null; + if (waitingRequestersQueueList == null) { + logger.error("requestReplyUsingPublisher called but listener thread not initialized"); + } else { + // Storing a non-default message ID identifies this as a synchronous + // request + msg.putMsgId(UUID.randomUUID().toString()); + + /* + * Create a queue for this request, the consumer thread will insert + * the reply on this queue + */ + LinkedBlockingQueue replyQueue = new LinkedBlockingQueue(); + waitingRequestersQueueList.addQueueToMap(msg.getMsgId(), replyQueue); + + /* + * Send the request + */ + publisher.send(msg); + + /* + * Wait for reply up to 3 * 5 = 15 seconds + */ + int reTransmits = 0; + int maxRetransmits = 3; + int retransmitTimeMs = 5000; + long sendTimeStamp = System.currentTimeMillis(); + while (reTransmits < maxRetransmits) { + if ((reply = replyQueue.poll()) != null) + break; + + long now = System.currentTimeMillis(); + if (now - sendTimeStamp > retransmitTimeMs) { + logger.debug("Retransmitting send... msg = " + msg.getPayload() + msg.getMsgId()); + publisher.send(msg); + sendTimeStamp = System.currentTimeMillis(); + reTransmits++; + } + } + waitingRequestersQueueList.removeQueueFromMap(msg.getMsgId()); + if (reTransmits == maxRetransmits) + throw new UebException(PortalApiConstants.ECOMP_UEB_TIMEOUT_ERROR, inTopicName, null, msg.toString()); + + } + return reply; + } + + /** + * Sends the specified message using the default publisher and waits for a + * reply. + * + * @param msg + * @return Message from a remote publisher, or null if timeout happens. + * @throws UebException + */ + public UebMsg requestReply(UebMsg msg) throws UebException { + return requestReplyUsingPublisher(msg, appPublisher); + } + + /** + * Sends the specified message using the publisher appropriate for the + * specified topic name, and waits for a reply. + * + * @param msg + * @param topicName + * @return Message from a remote publisher, or null if timeout happens. + * @throws UebException + */ + public UebMsg requestReplyEP(UebMsg msg, String topicName) throws UebException { + UebMsg returnMsg = null; + Publisher publisher = publisherList.getPublisher(topicName); + if (publisher != null) { + returnMsg = requestReplyUsingPublisher(msg, publisher); + } + return returnMsg; + } + + /** + * Publishes the payload as a UEB widget-notification message on the default + * publisher. Intended for use by Apps inter widget communication, not EP + * itself. + * + * @param payload + * @param userId + */ + public void postWidgetNotification(String payload, String userId) throws UebException { + UebMsg msg = new UebMsg(); + msg.putPayload(payload); + msg.putUserId(userId); + msg.putMsgType(UebMsgTypes.UEB_MSG_TYPE_WIDGET_NOTIFICATION); + this.publish(msg); + } + + /** + * Interrupts the long-running thread that runs the consumer. + */ + public void shutdown() { + if (this.listenerThread != null) { + this.listenerThread.interrupt(); + } + } +} diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsg.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsg.java new file mode 100644 index 00000000..da18c4f8 --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsg.java @@ -0,0 +1,119 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiConstants; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiProperties; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class UebMsg { + + private final Log logger = LogFactory.getLog(getClass()); + + private String version; + private String msgId; + private long timeStamp; + private String payload; + private String msgType; + private String userId; + private String sourceTopicName; + private String sourceIP; + private String sourceHostName; + + /** + * Creates a new object and populates the fields source IP, source topic, + * time stamp, version, and message id. + */ + public UebMsg() { + InetAddress ip; + try { + ip = InetAddress.getLocalHost(); + // Do not attempt to get name, why wait on DNS every time? + // sourceHostName = ip.getHostName(); + sourceIP = ip.getHostAddress(); + } catch (UnknownHostException e) { + sourceHostName = "unknown"; + sourceIP = "unknown"; + } + + this.timeStamp = System.currentTimeMillis(); + this.version = "1.0"; + this.msgId = PortalApiConstants.ECOMP_DEFAULT_MSG_ID; + this.payload = "empty payload content"; + this.sourceTopicName = PortalApiProperties.getProperty(PortalApiConstants.UEB_APP_INBOUND_MAILBOX_NAME); + if (this.sourceTopicName == null) + logger.error("Failed to get property " + PortalApiConstants.UEB_APP_INBOUND_MAILBOX_NAME); + } + + public void putMsgId(String msgId) { + this.msgId = msgId; + } + + public String getMsgId() { + return msgId; + } + + public void putPayload(String payload) { + this.payload = payload; + } + + public String getPayload() { + return payload; + } + + public void putMsgType(String msgType) { + this.msgType = msgType; + } + + public String getMsgType() { + return this.msgType; + } + + public void putUserId(String userId) { + this.userId = userId; + } + + public String getUserId() { + return this.userId; + } + + public void putSourceTopicName(String topic) { + this.sourceTopicName = topic; + } + + public String getSourceTopicName() { + return this.sourceTopicName; + } + + @Override + public String toString() { + return "UebMsg [version=" + version + ", msgId=" + msgId + ", timeStamp=" + timeStamp + ", msgType=" + msgType + + ", userId=" + userId + ", sourceTopicName=" + sourceTopicName + ", sourceIP=" + sourceIP + + ", sourceHostName=" + sourceHostName + "]" + System.lineSeparator() + "payload=" + payload; + } + +} diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsgTypes.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsgTypes.java new file mode 100644 index 00000000..28ab327f --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/UebMsgTypes.java @@ -0,0 +1,28 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +/** + * Publishes constants used in the UEB package. + */ +public interface UebMsgTypes { + public static final String UEB_MSG_TYPE_GET_FUNC_MENU = "uebMsgTypeGetFuncMenu"; + public static final String UEB_MSG_TYPE_WIDGET_NOTIFICATION = "uebMsgTypeWidgetNotification"; +} diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/WaitingRequestersQueueList.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/WaitingRequestersQueueList.java new file mode 100644 index 00000000..db0ee1b9 --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/WaitingRequestersQueueList.java @@ -0,0 +1,73 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A thin wrapper around ConcurrentHashMap that stores a queue for each + * Requester that is waiting for a Reply. When a reply is received that has a + * matching msgId, that requesters queue is populated with the reply message. + * + * Primarily for the UebManager to track requests while it waits for responses. + */ +public class WaitingRequestersQueueList { + private final Log logger = LogFactory.getLog(getClass()); + + private final Map> map; + + public WaitingRequestersQueueList() { + map = new ConcurrentHashMap<>(); + } + + public void addQueueToMap(String msgId, LinkedBlockingQueue queue) { + this.map.put(msgId, queue); + } + + public void addMsg(String msgId, UebMsg message) { + LinkedBlockingQueue queue = this.map.get(msgId); + if (queue != null) { + queue.add(message); + } else { + logger.warn("Did not find entry in WaitingRequestersQueueList for msgId " + msgId); + } + } + + public void removeQueueFromMap(String msgId) { + this.map.remove(msgId); + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("Map contains " + this.map.size() + " Publishers."); + for (Map.Entry> entry : this.map.entrySet()) { + String key = entry.getKey().toString(); + LinkedBlockingQueue queue = entry.getValue(); + sb.append("Entry msgId, " + key + " queue " + queue); + } + return sb.toString(); + } + +} -- cgit 1.2.3-korg