From 7ab93201e557976ed8b383cb5652fa129d7b36f7 Mon Sep 17 00:00:00 2001 From: Zlatko Murgoski Date: Mon, 31 Dec 2018 11:55:42 +0100 Subject: Fix sonar violation Fix sonar violation' Change-Id: Ia5718d2bcbf9f5efea40d8250b7ad57f6d2eb2f3 Issue-ID: DCAEGEN2-1016 Signed-off-by: Zlatko Murgoski --- .../event/publishing/DMaaPPublishersCache.java | 121 --------------------- 1 file changed, 121 deletions(-) delete mode 100644 src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java (limited to 'src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java') diff --git a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java deleted file mode 100644 index c66cee05..00000000 --- a/src/main/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCache.java +++ /dev/null @@ -1,121 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction.event.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.*; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static io.vavr.API.Option; -import static org.onap.dcae.commonFunction.event.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -class DMaaPPublishersCache { - - private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); - private final LoadingCache publishersCache; - private AtomicReference> dMaaPConfiguration; - - DMaaPPublishersCache(Map dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); - } - - DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, - OnPublisherRemovalListener onPublisherRemovalListener, - Map dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); - } - - Option getPublisher(String streamID) { - try { - return Option(publishersCache.getUnchecked(streamID)); - } catch (Exception e) { - log.warn("Could not create / load Cambria Publisher for streamID", e); - return Option.none(); - } - } - - void closePublisherFor(String streamId) { - publishersCache.invalidate(streamId); - } - - synchronized void reconfigure(Map newConfig) { - Map currentConfig = dMaaPConfiguration.get(); - Map removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); - Map changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); - dMaaPConfiguration.set(newConfig); - removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); - } - - static class OnPublisherRemovalListener implements RemovalListener { - - @Override - public void onRemoval(@Nonnull RemovalNotification notification) { - CambriaBatchingPublisher publisher = notification.getValue(); - if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull - try { - int timeout = 20; - TimeUnit unit = TimeUnit.SECONDS; - java.util.List stuck = publisher.close(timeout, unit); - if (!stuck.isEmpty()) { - log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); - } - } catch (InterruptedException | IOException e) { - log.error("Could not close Cambria publisher, some messages might have been dropped", e); - Thread.currentThread().interrupt(); - } - } - } - } - - class CambriaPublishersCacheLoader extends CacheLoader { - - @Override - public CambriaBatchingPublisher load(@Nonnull String domain) { - return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); - } - } - -} -- cgit 1.2.3-korg