diff options
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java')
-rw-r--r-- | src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java | 84 |
1 files changed, 29 insertions, 55 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 2b4cfc15..08e16e0c 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018,2020 Nokia. All rights reserved. + * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,36 +21,29 @@ package org.onap.dcae.common.publishing; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.clock.SaClock; -import com.att.nsa.logging.LoggingContext; -import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; -import io.vavr.control.Try; -import org.onap.dcae.common.VESLogger; import org.onap.dcae.common.model.VesEvent; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import reactor.core.publisher.Flux; -import java.io.IOException; - -import static org.onap.dcae.common.publishing.VavrUtils.f; +import java.util.List; +import java.util.Objects; +import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ public class DMaaPEventPublisher { - private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); - private DMaaPPublishersCache publishersCache; - private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); - - DMaaPEventPublisher(DMaaPPublishersCache publishersCache) { - this.publishersCache = publishersCache; - } + private Map<String, PublisherConfig> dMaaPConfig; + private final Publisher dmaapPublisher; public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) { - this(new DMaaPPublishersCache(dMaaPConfig)); + this.dMaaPConfig = dMaaPConfig; + dmaapPublisher = new Publisher(); } /** @@ -58,48 +51,29 @@ public class DMaaPEventPublisher { * @param dmaapConfiguration Dmaap configuration */ public void reload(Map<String, PublisherConfig> dmaapConfiguration){ - this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration); - } - - public void sendEvent(VesEvent vesEvent, String dmaapId){ - clearVesUniqueIdFromEvent(vesEvent); - publishersCache.getPublisher(dmaapId) - .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent))) - .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher)); - } - - private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher)) - .onFailure(exc -> closePublisher(event, dmaapId, exc)); + dMaaPConfig = dmaapConfiguration; + log.info("reload dmaap configuration"); } - private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) - throws IOException { - - String pk = event.getPK(); - int pendingMsgs = publisher.send(pk, event.asJsonObject().toString()); - if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { - log.info("Pending messages count: " + pendingMsgs); - } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId); - log.info(infoMsg); - outputLogger.info(infoMsg); + public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) { + clearVesUniqueIdFromEvent(vesEvents); + io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents); + Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId)); + MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst(); + return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse)); } - private void closePublisher(VesEvent event, String dmaapId, Throwable e) { - log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, dmaapId), e); - publishersCache.closePublisherFor(dmaapId); + private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) { + return io.vavr.collection.List.ofAll(vesEvents) + .map(event -> event.asJsonObject().toString()); } - private void clearVesUniqueIdFromEvent(VesEvent event) { - if (event.hasType(VesEvent.VES_UNIQUE_ID)) { - String uuid = event.getUniqueId().toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.removeElement(VesEvent.VES_UNIQUE_ID); - } + private void clearVesUniqueIdFromEvent(List<VesEvent> events) { + events.stream() + .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID)) + .forEach(event -> { + log.debug("Removing VESuniqueid object from event"); + event.removeElement(VesEvent.VES_UNIQUE_ID); + }); } } |