aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java')
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java84
1 files changed, 29 insertions, 55 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 2b4cfc15..08e16e0c 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,36 +21,29 @@
package org.onap.dcae.common.publishing;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
-import io.vavr.control.Try;
-import org.onap.dcae.common.VESLogger;
import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import reactor.core.publisher.Flux;
-import java.io.IOException;
-
-import static org.onap.dcae.common.publishing.VavrUtils.f;
+import java.util.List;
+import java.util.Objects;
+import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
public class DMaaPEventPublisher {
- private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
- private DMaaPPublishersCache publishersCache;
- private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");
-
- DMaaPEventPublisher(DMaaPPublishersCache publishersCache) {
- this.publishersCache = publishersCache;
- }
+ private Map<String, PublisherConfig> dMaaPConfig;
+ private final Publisher dmaapPublisher;
public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) {
- this(new DMaaPPublishersCache(dMaaPConfig));
+ this.dMaaPConfig = dMaaPConfig;
+ dmaapPublisher = new Publisher();
}
/**
@@ -58,48 +51,29 @@ public class DMaaPEventPublisher {
* @param dmaapConfiguration Dmaap configuration
*/
public void reload(Map<String, PublisherConfig> dmaapConfiguration){
- this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration);
- }
-
- public void sendEvent(VesEvent vesEvent, String dmaapId){
- clearVesUniqueIdFromEvent(vesEvent);
- publishersCache.getPublisher(dmaapId)
- .onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent)))
- .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher));
- }
-
- private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) {
- Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher))
- .onFailure(exc -> closePublisher(event, dmaapId, exc));
+ dMaaPConfig = dmaapConfiguration;
+ log.info("reload dmaap configuration");
}
- private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher)
- throws IOException {
-
- String pk = event.getPK();
- int pendingMsgs = publisher.send(pk, event.asJsonObject().toString());
- if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
- log.info("Pending messages count: " + pendingMsgs);
- }
- String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId);
- log.info(infoMsg);
- outputLogger.info(infoMsg);
+ public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) {
+ clearVesUniqueIdFromEvent(vesEvents);
+ io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents);
+ Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId));
+ MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst();
+ return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse));
}
- private void closePublisher(VesEvent event, String dmaapId, Throwable e) {
- log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, dmaapId), e);
- publishersCache.closePublisherFor(dmaapId);
+ private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) {
+ return io.vavr.collection.List.ofAll(vesEvents)
+ .map(event -> event.asJsonObject().toString());
}
- private void clearVesUniqueIdFromEvent(VesEvent event) {
- if (event.hasType(VesEvent.VES_UNIQUE_ID)) {
- String uuid = event.getUniqueId().toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- log.debug("Removing VESuniqueid object from event");
- event.removeElement(VesEvent.VES_UNIQUE_ID);
- }
+ private void clearVesUniqueIdFromEvent(List<VesEvent> events) {
+ events.stream()
+ .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID))
+ .forEach(event -> {
+ log.debug("Removing VESuniqueid object from event");
+ event.removeElement(VesEvent.VES_UNIQUE_ID);
+ });
}
}