/*- * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.dcae.common.publishing; import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.google.common.cache.*; import io.vavr.collection.Map; import io.vavr.control.Option; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static io.vavr.API.Option; import static org.onap.dcae.common.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ class DMaaPPublishersCache { private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); private final LoadingCache publishersCache; private AtomicReference> dMaaPConfiguration; DMaaPPublishersCache(Map dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() .removalListener(new OnPublisherRemovalListener()) .build(new CambriaPublishersCacheLoader()); } DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, OnPublisherRemovalListener onPublisherRemovalListener, Map dMaaPConfiguration) { this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); this.publishersCache = CacheBuilder.newBuilder() .removalListener(onPublisherRemovalListener) .build(dMaaPPublishersCacheLoader); } Option getPublisher(String streamID) { try { return Option(publishersCache.getUnchecked(streamID)); } catch (Exception e) { log.warn("Could not create / load Cambria Publisher for streamID", e); return Option.none(); } } void closePublisherFor(String streamId) { publishersCache.invalidate(streamId); } synchronized void reconfigure(Map newConfig) { Map currentConfig = dMaaPConfiguration.get(); Map removedConfigurations = currentConfig .filterKeys(domain -> !newConfig.containsKey(domain)); Map changedConfigurations = newConfig .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); dMaaPConfiguration.set(newConfig); removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); } static class OnPublisherRemovalListener implements RemovalListener { @Override public void onRemoval(@Nonnull RemovalNotification notification) { CambriaBatchingPublisher publisher = notification.getValue(); if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull try { int timeout = 20; TimeUnit unit = TimeUnit.SECONDS; java.util.List stuck = publisher.close(timeout, unit); if (!stuck.isEmpty()) { log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " + "%s messages were dropped", stuck.size(), timeout, unit)); } } catch (InterruptedException | IOException e) { log.error("Could not close Cambria publisher, some messages might have been dropped", e); Thread.currentThread().interrupt(); } } } } class CambriaPublishersCacheLoader extends CacheLoader { @Override public CambriaBatchingPublisher load(@Nonnull String domain) { return dMaaPConfiguration.get() .get(domain) .toTry(() -> new RuntimeException( f("DMaaP configuration contains no configuration for domain: '%s'", domain))) .flatMap(DMaaPPublishersBuilder::buildPublisher) .get(); } } }