aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/commonFunction/event/publishing
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/event/publishing')
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java76
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java19
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java20
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java44
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java17
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java4
6 files changed, 88 insertions, 92 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
index 5865b12c..179e8826 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParser.java
@@ -19,21 +19,19 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.List;
-import static io.vavr.API.Try;
-import static io.vavr.API.Tuple;
-import static io.vavr.API.unchecked;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;
import io.vavr.control.Try;
+import org.onap.dcae.commonFunction.AnyNode;
+
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import org.onap.dcae.commonFunction.AnyNode;
+
+import static io.vavr.API.*;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
@@ -43,23 +41,23 @@ public final class DMaaPConfigurationParser {
public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
return readFromFile(configLocation)
- .flatMap(DMaaPConfigurationParser::toJSON)
- .flatMap(DMaaPConfigurationParser::toConfigMap);
+ .flatMap(DMaaPConfigurationParser::toJSON)
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
}
private static Try<String> readFromFile(Path configLocation) {
return Try(() -> new String(Files.readAllBytes(configLocation)))
- .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
+ .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
}
private static Try<AnyNode> toJSON(String config) {
return Try(() -> AnyNode.fromString(config))
- .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
+ .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
}
private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
- .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
+ .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
}
private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
@@ -68,40 +66,40 @@ public final class DMaaPConfigurationParser {
private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
return root.get("channels").toList().toMap(
- channel -> channel.get("name").toString(),
- channel -> {
- String destinationsStr = channel.getAsOption("cambria.url")
- .getOrElse(channel.getAsOption("cambria.hosts").get())
- .toString();
- String topic = channel.get("cambria.topic").toString();
- Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
- Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
- List<String> destinations = List(destinationsStr.split(","));
- return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
- });
+ channel -> channel.get("name").toString(),
+ channel -> {
+ String destinationsStr = channel.getAsOption("cambria.url")
+ .getOrElse(channel.getAsOption("cambria.hosts").get())
+ .toString();
+ String topic = channel.get("cambria.topic").toString();
+ Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
+ Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
+ List<String> destinations = List(destinationsStr.split(","));
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
}
private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
return root.keys().toMap(
- channelName -> channelName,
- channelName -> {
- AnyNode channelConfig = root.get(channelName);
- Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
- Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
- URL topicURL = unchecked(
- () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
- String[] pathSegments = topicURL.getPath().substring(1).split("/");
- String topic = pathSegments[1];
- String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
- List<String> destinations = List(destination);
- return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
- });
+ channelName -> channelName,
+ channelName -> {
+ AnyNode channelConfig = root.get(channelName);
+ Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
+ Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
+ URL topicURL = unchecked(
+ () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
+ String[] pathSegments = topicURL.getPath().substring(1).split("/");
+ String topic = pathSegments[1];
+ String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
+ List<String> destinations = List(destination);
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
}
private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
String topic, List<String> destinations) {
return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
- .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
- .getOrElse(new PublisherConfig(destinations, topic));
+ .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
+ .getOrElse(new PublisherConfig(destinations, topic));
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
index fd9b3ae1..a0ee3bfb 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisher.java
@@ -21,20 +21,21 @@
package org.onap.dcae.commonFunction.event.publishing;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
import io.vavr.control.Try;
-import java.io.IOException;
import org.json.JSONObject;
import org.onap.dcae.commonFunction.VESLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
@@ -55,9 +56,9 @@ class DMaaPEventPublisher implements EventPublisher {
public void sendEvent(JSONObject event, String domain) {
clearVesUniqueIdFromEvent(event);
publishersCache.getPublisher(domain)
- .onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
- .forEach(publisher -> sendEvent(event, domain, publisher));
+ .onEmpty(() ->
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
}
@Override
@@ -67,11 +68,11 @@ class DMaaPEventPublisher implements EventPublisher {
private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
Try.run(() -> uncheckedSendEvent(event, domain, publisher))
- .onFailure(exc -> closePublisher(event, domain, exc));
+ .onFailure(exc -> closePublisher(event, domain, exc));
}
private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
- throws IOException {
+ throws IOException {
int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
log.info("Pending messages count: " + pendingMsgs);
@@ -83,7 +84,7 @@ class DMaaPEventPublisher implements EventPublisher {
private void closePublisher(JSONObject event, String domain, Throwable e) {
log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, domain), e);
+ event, domain), e);
publishersCache.closePublisherFor(domain);
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
index a7865a45..489fcbf0 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersBuilder.java
@@ -19,15 +19,15 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.Try;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import io.vavr.control.Try;
+import static io.vavr.API.Try;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
@@ -36,7 +36,7 @@ final class DMaaPPublishersBuilder {
@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
return Try(() -> builder(config).build())
- .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
+ .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
}
private static PublisherBuilder builder(PublisherConfig config) {
@@ -49,14 +49,14 @@ final class DMaaPPublishersBuilder {
private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
return unAuthenticatedBuilder(config)
- .usingHttps()
- .authenticatedByHttp(config.userName().get(), config.password().get());
+ .usingHttps()
+ .authenticatedByHttp(config.userName().get(), config.password().get());
}
private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
return new CambriaClientBuilders.PublisherBuilder()
- .usingHosts(config.destinations().mkString(","))
- .onTopic(config.topic())
- .logSendFailuresAfter(5);
+ .usingHosts(config.destinations().mkString(","))
+ .onTopic(config.topic())
+ .logSendFailuresAfter(5);
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
index 102d2774..4cdf92da 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java
@@ -20,24 +20,20 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.Option;
-import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
-
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.*;
import io.vavr.collection.Map;
import io.vavr.control.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nonnull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static io.vavr.API.Option;
+import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
@@ -51,8 +47,8 @@ class DMaaPPublishersCache {
DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(new OnPublisherRemovalListener())
- .build(new CambriaPublishersCacheLoader());
+ .removalListener(new OnPublisherRemovalListener())
+ .build(new CambriaPublishersCacheLoader());
}
DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
@@ -60,8 +56,8 @@ class DMaaPPublishersCache {
Map<String, PublisherConfig> dMaaPConfiguration) {
this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
this.publishersCache = CacheBuilder.newBuilder()
- .removalListener(onPublisherRemovalListener)
- .build(dMaaPPublishersCacheLoader);
+ .removalListener(onPublisherRemovalListener)
+ .build(dMaaPPublishersCacheLoader);
}
Option<CambriaBatchingPublisher> getPublisher(String streamID) {
@@ -80,9 +76,9 @@ class DMaaPPublishersCache {
synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
Map<String, PublisherConfig> removedConfigurations = currentConfig
- .filterKeys(domain -> !newConfig.containsKey(domain));
+ .filterKeys(domain -> !newConfig.containsKey(domain));
Map<String, PublisherConfig> changedConfigurations = newConfig
- .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
+ .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
dMaaPConfiguration.set(newConfig);
removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
}
@@ -99,7 +95,7 @@ class DMaaPPublishersCache {
java.util.List<?> stuck = publisher.close(timeout, unit);
if (!stuck.isEmpty()) {
log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
- + "%s messages were dropped", stuck.size(), timeout, unit));
+ + "%s messages were dropped", stuck.size(), timeout, unit));
}
} catch (InterruptedException | IOException e) {
log.error("Could not close Cambria publisher, some messages might have been dropped", e);
@@ -113,11 +109,11 @@ class DMaaPPublishersCache {
@Override
public CambriaBatchingPublisher load(@Nonnull String domain) {
return dMaaPConfiguration.get()
- .get(domain)
- .toTry(() -> new RuntimeException(
- f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
- .flatMap(DMaaPPublishersBuilder::buildPublisher)
- .get();
+ .get(domain)
+ .toTry(() -> new RuntimeException(
+ f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
+ .flatMap(DMaaPPublishersBuilder::buildPublisher)
+ .get();
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
index 4a056778..f1cbb8e5 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/PublisherConfig.java
@@ -21,6 +21,7 @@ package org.onap.dcae.commonFunction.event.publishing;
import io.vavr.collection.List;
import io.vavr.control.Option;
+
import java.util.Objects;
/**
@@ -76,9 +77,9 @@ public final class PublisherConfig {
}
PublisherConfig that = (PublisherConfig) o;
return Objects.equals(destinations, that.destinations) &&
- Objects.equals(topic, that.topic) &&
- Objects.equals(userName, that.userName) &&
- Objects.equals(password, that.password);
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(userName, that.userName) &&
+ Objects.equals(password, that.password);
}
@Override
@@ -89,10 +90,10 @@ public final class PublisherConfig {
@Override
public String toString() {
return "PublisherConfig{" +
- "destinations=" + destinations +
- ", topic='" + topic + '\'' +
- ", userName='" + userName + '\'' +
- ", password='" + password + '\'' +
- '}';
+ "destinations=" + destinations +
+ ", topic='" + topic + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
}
}
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
index 9bf3ef8c..78f34ff4 100644
--- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
+++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/VavrUtils.java
@@ -19,11 +19,11 @@
*/
package org.onap.dcae.commonFunction.event.publishing;
-import static io.vavr.API.$;
-
import io.vavr.API;
import io.vavr.API.Match.Case;
+import static io.vavr.API.$;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/