diff options
author | PawelSzalapski <pawel.szalapski@nokia.com> | 2018-07-31 08:18:03 +0200 |
---|---|---|
committer | PawelSzalapski <pawel.szalapski@nokia.com> | 2018-08-01 09:56:00 +0200 |
commit | fc073344d4c0eb8a28bf34c07a8439176cf846ca (patch) | |
tree | 01f5b4789c3d9369eaebb54a9f910a9fa400af1f /src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java | |
parent | d12cd3525284cc41414d8fdae09e2ffbc03a1fbb (diff) |
Replace nsaCore library with Spring
Change-Id: I2227939a67a2cbba2d392136d49ef4419600d186
Issue-ID: DCAEGEN2-602
Signed-off-by: PawelSzalapski <pawel.szalapski@nokia.com>
Diffstat (limited to 'src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java')
-rw-r--r-- | src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java | 44 |
1 files changed, 20 insertions, 24 deletions
diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java index 102d2774..4cdf92da 100644 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java +++ b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java @@ -20,24 +20,20 @@ */ package org.onap.dcae.commonFunction.event.publishing; -import static io.vavr.API.Option; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.google.common.cache.*; import io.vavr.collection.Map; import io.vavr.control.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nonnull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static io.vavr.API.Option; +import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) @@ -51,8 +47,8 @@ class DMaaPPublishersCache { DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); + .removalListener(new OnPublisherRemovalListener()) + .build(new CambriaPublishersCacheLoader()); } DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, @@ -60,8 +56,8 @@ class DMaaPPublishersCache { Map<String, PublisherConfig> dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); + .removalListener(onPublisherRemovalListener) + .build(dMaaPPublishersCacheLoader); } Option<CambriaBatchingPublisher> getPublisher(String streamID) { @@ -80,9 +76,9 @@ class DMaaPPublishersCache { synchronized void reconfigure(Map<String, PublisherConfig> newConfig) { Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get(); Map<String, PublisherConfig> removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); + .filterKeys(domain -> !newConfig.containsKey(domain)); Map<String, PublisherConfig> changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); + .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); dMaaPConfiguration.set(newConfig); removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); } @@ -99,7 +95,7 @@ class DMaaPPublishersCache { java.util.List<?> stuck = publisher.close(timeout, unit); if (!stuck.isEmpty()) { log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); + + "%s messages were dropped", stuck.size(), timeout, unit)); } } catch (InterruptedException | IOException e) { log.error("Could not close Cambria publisher, some messages might have been dropped", e); @@ -113,11 +109,11 @@ class DMaaPPublishersCache { @Override public CambriaBatchingPublisher load(@Nonnull String domain) { return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); + .get(domain) + .toTry(() -> new RuntimeException( + f("DMaaP configuration contains no configuration for domain: '%s'", domain))) + .flatMap(DMaaPPublishersBuilder::buildPublisher) + .get(); } } |