From fc073344d4c0eb8a28bf34c07a8439176cf846ca Mon Sep 17 00:00:00 2001 From: PawelSzalapski Date: Tue, 31 Jul 2018 08:18:03 +0200 Subject: Replace nsaCore library with Spring Change-Id: I2227939a67a2cbba2d392136d49ef4419600d186 Issue-ID: DCAEGEN2-602 Signed-off-by: PawelSzalapski --- .../event/publishing/DMaaPConfigurationParser.java | 76 +++++++++++----------- .../event/publishing/DMaaPEventPublisher.java | 19 +++--- .../event/publishing/DMaaPPublishersBuilder.java | 20 +++--- .../event/publishing/DMaaPPublishersCache.java | 44 ++++++------- .../event/publishing/PublisherConfig.java | 17 ++--- .../commonFunction/event/publishing/VavrUtils.java | 4 +- 6 files changed, 88 insertions(+), 92 deletions(-) (limited to 'src/main/java/org/onap/dcae/commonFunction/event/publishing') diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java index 5865b12c..179e8826 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java @@ -19,21 +19,19 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.List; -import static io.vavr.API.Try; -import static io.vavr.API.Tuple; -import static io.vavr.API.unchecked; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import io.vavr.collection.List; import io.vavr.collection.Map; import io.vavr.control.Option; import io.vavr.control.Try; +import org.onap.dcae.commonFunction.AnyNode; + import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; -import org.onap.dcae.commonFunction.AnyNode; + +import static io.vavr.API.*; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) @@ -43,23 +41,23 @@ public final class DMaaPConfigurationParser { public static Try> parseToDomainMapping(Path configLocation) { return readFromFile(configLocation) - .flatMap(DMaaPConfigurationParser::toJSON) - .flatMap(DMaaPConfigurationParser::toConfigMap); + .flatMap(DMaaPConfigurationParser::toJSON) + .flatMap(DMaaPConfigurationParser::toConfigMap); } private static Try readFromFile(Path configLocation) { return Try(() -> new String(Files.readAllBytes(configLocation))) - .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); + .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); } private static Try toJSON(String config) { return Try(() -> AnyNode.fromString(config)) - .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); + .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); } private static Try> toConfigMap(AnyNode config) { return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) - .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); + .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); } private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { @@ -68,40 +66,40 @@ public final class DMaaPConfigurationParser { private static Map parseLegacyFormat(AnyNode root) { return root.get("channels").toList().toMap( - channel -> channel.get("name").toString(), - channel -> { - String destinationsStr = channel.getAsOption("cambria.url") - .getOrElse(channel.getAsOption("cambria.hosts").get()) - .toString(); - String topic = channel.get("cambria.topic").toString(); - Option maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); - Option maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); - List destinations = List(destinationsStr.split(",")); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); + channel -> channel.get("name").toString(), + channel -> { + String destinationsStr = channel.getAsOption("cambria.url") + .getOrElse(channel.getAsOption("cambria.hosts").get()) + .toString(); + String topic = channel.get("cambria.topic").toString(); + Option maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); + Option maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); + List destinations = List(destinationsStr.split(",")); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); } private static Map parseNewFormat(AnyNode root) { return root.keys().toMap( - channelName -> channelName, - channelName -> { - AnyNode channelConfig = root.get(channelName); - Option maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); - Option maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); - URL topicURL = unchecked( - () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); - String[] pathSegments = topicURL.getPath().substring(1).split("/"); - String topic = pathSegments[1]; - String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); - List destinations = List(destination); - return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); - }); + channelName -> channelName, + channelName -> { + AnyNode channelConfig = root.get(channelName); + Option maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); + Option maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); + URL topicURL = unchecked( + () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); + String[] pathSegments = topicURL.getPath().substring(1).split("/"); + String topic = pathSegments[1]; + String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); + List destinations = List(destination); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); } private static PublisherConfig buildBasedOnAuth(Option maybeUser, Option maybePassword, String topic, List destinations) { return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password))) - .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) - .getOrElse(new PublisherConfig(destinations, topic)); + .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) + .getOrElse(new PublisherConfig(destinations, topic)); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java index fd9b3ae1..a0ee3bfb 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java @@ -21,20 +21,21 @@ package org.onap.dcae.commonFunction.event.publishing; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; import io.vavr.control.Try; -import java.io.IOException; import org.json.JSONObject; import org.onap.dcae.commonFunction.VESLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ @@ -55,9 +56,9 @@ class DMaaPEventPublisher implements EventPublisher { public void sendEvent(JSONObject event, String domain) { clearVesUniqueIdFromEvent(event); publishersCache.getPublisher(domain) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) - .forEach(publisher -> sendEvent(event, domain, publisher)); + .onEmpty(() -> + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) + .forEach(publisher -> sendEvent(event, domain, publisher)); } @Override @@ -67,11 +68,11 @@ class DMaaPEventPublisher implements EventPublisher { private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { Try.run(() -> uncheckedSendEvent(event, domain, publisher)) - .onFailure(exc -> closePublisher(event, domain, exc)); + .onFailure(exc -> closePublisher(event, domain, exc)); } private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) - throws IOException { + throws IOException { int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); @@ -83,7 +84,7 @@ class DMaaPEventPublisher implements EventPublisher { private void closePublisher(JSONObject event, String domain, Throwable e) { log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, domain), e); + event, domain), e); publishersCache.closePublisherFor(domain); } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java index a7865a45..489fcbf0 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java @@ -19,15 +19,15 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.Try; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import io.vavr.control.Try; +import static io.vavr.API.Try; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ @@ -36,7 +36,7 @@ final class DMaaPPublishersBuilder { @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") static Try buildPublisher(PublisherConfig config) { return Try(() -> builder(config).build()) - .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); + .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); } private static PublisherBuilder builder(PublisherConfig config) { @@ -49,14 +49,14 @@ final class DMaaPPublishersBuilder { private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { return unAuthenticatedBuilder(config) - .usingHttps() - .authenticatedByHttp(config.userName().get(), config.password().get()); + .usingHttps() + .authenticatedByHttp(config.userName().get(), config.password().get()); } private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { return new CambriaClientBuilders.PublisherBuilder() - .usingHosts(config.destinations().mkString(",")) - .onTopic(config.topic()) - .logSendFailuresAfter(5); + .usingHosts(config.destinations().mkString(",")) + .onTopic(config.topic()) + .logSendFailuresAfter(5); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java index 102d2774..4cdf92da 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java @@ -20,24 +20,20 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.Option; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.google.common.cache.*; import io.vavr.collection.Map; import io.vavr.control.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nonnull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static io.vavr.API.Option; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) @@ -51,8 +47,8 @@ class DMaaPPublishersCache { DMaaPPublishersCache(Map dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); + .removalListener(new OnPublisherRemovalListener()) + .build(new CambriaPublishersCacheLoader()); } DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, @@ -60,8 +56,8 @@ class DMaaPPublishersCache { Map dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); + .removalListener(onPublisherRemovalListener) + .build(dMaaPPublishersCacheLoader); } Option getPublisher(String streamID) { @@ -80,9 +76,9 @@ class DMaaPPublishersCache { synchronized void reconfigure(Map newConfig) { Map currentConfig = dMaaPConfiguration.get(); Map removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); + .filterKeys(domain -> !newConfig.containsKey(domain)); Map changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); + .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); dMaaPConfiguration.set(newConfig); removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); } @@ -99,7 +95,7 @@ class DMaaPPublishersCache { java.util.List stuck = publisher.close(timeout, unit); if (!stuck.isEmpty()) { log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); + + "%s messages were dropped", stuck.size(), timeout, unit)); } } catch (InterruptedException | IOException e) { log.error("Could not close Cambria publisher, some messages might have been dropped", e); @@ -113,11 +109,11 @@ class DMaaPPublishersCache { @Override public CambriaBatchingPublisher load(@Nonnull String domain) { return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); + .get(domain) + .toTry(() -> new RuntimeException( + f("DMaaP configuration contains no configuration for domain: '%s'", domain))) + .flatMap(DMaaPPublishersBuilder::buildPublisher) + .get(); } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java index 4a056778..f1cbb8e5 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java @@ -21,6 +21,7 @@ package org.onap.dcae.commonFunction.event.publishing; import io.vavr.collection.List; import io.vavr.control.Option; + import java.util.Objects; /** @@ -76,9 +77,9 @@ public final class PublisherConfig { } PublisherConfig that = (PublisherConfig) o; return Objects.equals(destinations, that.destinations) && - Objects.equals(topic, that.topic) && - Objects.equals(userName, that.userName) && - Objects.equals(password, that.password); + Objects.equals(topic, that.topic) && + Objects.equals(userName, that.userName) && + Objects.equals(password, that.password); } @Override @@ -89,10 +90,10 @@ public final class PublisherConfig { @Override public String toString() { return "PublisherConfig{" + - "destinations=" + destinations + - ", topic='" + topic + '\'' + - ", userName='" + userName + '\'' + - ", password='" + password + '\'' + - '}'; + "destinations=" + destinations + + ", topic='" + topic + '\'' + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + '}'; } } diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java index 9bf3ef8c..78f34ff4 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java @@ -19,11 +19,11 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.$; - import io.vavr.API; import io.vavr.API.Match.Case; +import static io.vavr.API.$; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ -- cgit 1.2.3-korg