diff options
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/org/onap/dcae/common/ConfigProcessors.java | 2 | ||||
-rw-r--r-- | src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java | 28 |
2 files changed, 18 insertions, 12 deletions
diff --git a/src/main/java/org/onap/dcae/common/ConfigProcessors.java b/src/main/java/org/onap/dcae/common/ConfigProcessors.java index 040a3e6e..d53bf23b 100644 --- a/src/main/java/org/onap/dcae/common/ConfigProcessors.java +++ b/src/main/java/org/onap/dcae/common/ConfigProcessors.java @@ -202,7 +202,7 @@ public class ConfigProcessors { } } - private String performOperation(String operation, String value) { + String performOperation(String operation, String value) { log.info("performOperation"); if ("convertMBtoKB".equals(operation)) { float kbValue = Float.parseFloat(value) * 1024; diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 08e16e0c..a7c0a0eb 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017,2020,2023 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018-2021 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -33,6 +33,7 @@ import java.util.List; import java.util.Objects; import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ @@ -48,9 +49,10 @@ public class DMaaPEventPublisher { /** * Reload Dmaap configuration + * * @param dmaapConfiguration Dmaap configuration */ - public void reload(Map<String, PublisherConfig> dmaapConfiguration){ + public void reload(Map<String, PublisherConfig> dmaapConfiguration) { dMaaPConfig = dmaapConfiguration; log.info("reload dmaap configuration"); } @@ -58,22 +60,26 @@ public class DMaaPEventPublisher { public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) { clearVesUniqueIdFromEvent(vesEvents); io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents); - Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId)); + HttpStatus rc = messageRouterPublishResponse(events, dmaapId); + return rc; + } + + HttpStatus messageRouterPublishResponse(io.vavr.collection.List<String> events, String dmaapId) { + Flux<MessageRouterPublishResponse> messageRouterPublishFlux = + dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId)); MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst(); return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse)); + } private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) { - return io.vavr.collection.List.ofAll(vesEvents) - .map(event -> event.asJsonObject().toString()); + return io.vavr.collection.List.ofAll(vesEvents).map(event -> event.asJsonObject().toString()); } private void clearVesUniqueIdFromEvent(List<VesEvent> events) { - events.stream() - .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID)) - .forEach(event -> { - log.debug("Removing VESuniqueid object from event"); - event.removeElement(VesEvent.VES_UNIQUE_ID); - }); + events.stream().filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID)).forEach(event -> { + log.debug("Removing VESuniqueid object from event"); + event.removeElement(VesEvent.VES_UNIQUE_ID); + }); } } |