diff options
author | Maciej Malewski <maciej.malewski@nokia.com> | 2021-06-08 09:04:48 +0200 |
---|---|---|
committer | Maciej Malewski <maciej.malewski@nokia.com> | 2021-06-17 10:03:49 +0200 |
commit | 74b598291ed2461e0e482f556baf2943a97a54f2 (patch) | |
tree | 22e1140bc0f27680be4d47d40b0c94f7205b45b3 /src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java | |
parent | 26be283f4a7044aea4ee0ca480fde20eb5233ee2 (diff) |
Replace cambria with DmaaP client
- remove cambria, add DmaaP client
- sending event for many topics at once is no longer supported
- add backward compatibility status codes
- add additional validation for batchEvent
Issue-ID: DCAEGEN2-1483
Signed-off-by: Maciej Malewski <maciej.malewski@nokia.com>
Change-Id: I945c38b4ab04b697ecfabd5ce38502f83fa70d1a
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java')
-rw-r--r-- | src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java | 121 |
1 files changed, 0 insertions, 121 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java deleted file mode 100644 index b7997ef9..00000000 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java +++ /dev/null @@ -1,121 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.common.publishing; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.google.common.cache.*; -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static io.vavr.API.Option; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -class DMaaPPublishersCache { - - private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); - private final LoadingCache<String, CambriaBatchingPublisher> publishersCache; - private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration; - - DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(new OnPublisherRemovalListener()) - .build(new CambriaPublishersCacheLoader()); - } - - DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, - OnPublisherRemovalListener onPublisherRemovalListener, - Map<String, PublisherConfig> dMaaPConfiguration) { - this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); - this.publishersCache = CacheBuilder.newBuilder() - .removalListener(onPublisherRemovalListener) - .build(dMaaPPublishersCacheLoader); - } - - Option<CambriaBatchingPublisher> getPublisher(String streamID) { - try { - return Option(publishersCache.getUnchecked(streamID)); - } catch (Exception e) { - log.warn("Could not create / load Cambria Publisher for streamID", e); - return Option.none(); - } - } - - void closePublisherFor(String streamId) { - publishersCache.invalidate(streamId); - } - - synchronized void reconfigure(Map<String, PublisherConfig> newConfig) { - Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get(); - Map<String, PublisherConfig> removedConfigurations = currentConfig - .filterKeys(domain -> !newConfig.containsKey(domain)); - Map<String, PublisherConfig> changedConfigurations = newConfig - .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e))); - dMaaPConfiguration.set(newConfig); - removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1)); - } - - static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> { - - @Override - public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) { - CambriaBatchingPublisher publisher = notification.getValue(); - if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull - try { - int timeout = 20; - TimeUnit unit = TimeUnit.SECONDS; - java.util.List<?> stuck = publisher.close(timeout, unit); - if (!stuck.isEmpty()) { - log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " - + "%s messages were dropped", stuck.size(), timeout, unit)); - } - } catch (InterruptedException | IOException e) { - log.error("Could not close Cambria publisher, some messages might have been dropped", e); - Thread.currentThread().interrupt(); - } - } - } - } - - class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> { - - @Override - public CambriaBatchingPublisher load(@Nonnull String domain) { - return dMaaPConfiguration.get() - .get(domain) - .toTry(() -> new RuntimeException( - f("DMaaP configuration contains no configuration for domain: '%s'", domain))) - .flatMap(DMaaPPublishersBuilder::buildPublisher) - .get(); - } - } - -} |