From 6beb446925c967aca92f5513adf36c5db77c00d6 Mon Sep 17 00:00:00 2001 From: TATTAVARADA Date: Thu, 27 Apr 2017 07:53:18 -0400 Subject: [PORTAL-7] Rebase This rebasing includes common libraries and common overlays projects abstraction of components Change-Id: Ia1efa4deacdc5701e6205104ac021a6c80ed60ba Signed-off-by: st782s --- .../portalsdk/core/onboarding/ueb/Publisher.java | 125 +++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java (limited to 'ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java') diff --git a/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java new file mode 100644 index 00000000..8dbdda2b --- /dev/null +++ b/ecomp-sdk/epsdk-core/src/main/java/org/openecomp/portalsdk/core/onboarding/ueb/Publisher.java @@ -0,0 +1,125 @@ +/*- + * ================================================================================ + * ECOMP Portal SDK + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ================================================================================ + */ +package org.openecomp.portalsdk.core.onboarding.ueb; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.openecomp.portalsdk.core.onboarding.util.PortalApiConstants; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + +/** + * Provides a publisher that sends messages to a UEB topic. + * + * Utilizes AT&T's UEB/Cambria subscriber/publisher messaging service. + */ +public class Publisher { + + private final Log logger = LogFactory.getLog(getClass()); + + protected final LinkedList urlList = Helper.uebUrlList(); + + private final String topicName; + private final String publisherKey; + private final String publisherSecret; + + /** + * Accepts coordinates needed to publish to a UEB topic. + * + * @param publisherKey + * UEB key used to publish to the topic + * @param publisherSecret + * UEB secret used to publish to the topic + * @param topicName + * UEB topic name + */ + public Publisher(String publisherKey, String publisherSecret, String topicName) { + this.publisherKey = publisherKey; + this.publisherSecret = publisherSecret; + this.topicName = topicName; + logger.info("Publisher instantiated for topic " + topicName); + } + + /** + * Creates a publisher, subscribes to the topic, sends the specified message + * to the topic, then closes the publisher. This ensures that the single + * message goes immediately. UEB is designed for high throughput and tries + * to batch up multiple messages in each send, but this method wants the + * single message to go immediately. + * + * @param uebMsg + * Message object to send as the payload. + * @throws UebException + * If anything goes wrong, including JSON serialization of the + * specified message object. + */ + public void send(UebMsg uebMsg) throws UebException { + String msg = null; + + CambriaBatchingPublisher pub; + try { + pub = new CambriaClientBuilders.PublisherBuilder() + .authenticatedBy(publisherKey, publisherSecret).usingHosts(urlList).onTopic(topicName).build(); + } catch (MalformedURLException e1) { + logger.error("pub.build Exception ", e1); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, e1, topicName, null, msg); + } catch (GeneralSecurityException e1) { + logger.error("pub.build Exception ", e1); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, e1, topicName, null, msg); + } + + try { + ObjectWriter mapper = new ObjectMapper().writer().withDefaultPrettyPrinter(); + msg = mapper.writeValueAsString(uebMsg); + } catch (JsonProcessingException e) { + throw new UebException(PortalApiConstants.ECOMP_UEB_INVALID_MSG, topicName, null, null); + } + + try { + logger.debug("Publishing to " + topicName + " msg: " + msg); + int NumSent = pub.send(PortalApiConstants.ECOMP_GENERAL_UEB_PARTITION, msg); + if (NumSent == 0) { + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, topicName, null, msg); + } + } catch (IOException ex) { + logger.error("Failed to publish", ex); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, ex, topicName, null, msg); + } + + try { + // close the publisher to make sure everything's sent before exiting + pub.close(5, TimeUnit.SECONDS); + } catch (Exception ex) { + logger.error("pub.close Exception ", ex); + throw new UebException(PortalApiConstants.ECOMP_UEB_UNKNOWN_PUBLISH_ERROR, ex, topicName, null, msg); + } + + } +} \ No newline at end of file -- cgit 1.2.3-korg