aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java')
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java28
1 files changed, 17 insertions, 11 deletions
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 08e16e0c..a7c0a0eb 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* org.onap.dcaegen2.collectors.ves
* ================================================================================
- * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017,2020,2023 AT&T Intellectual Property. All rights reserved.
* Copyright (C) 2018-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Objects;
import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
@@ -48,9 +49,10 @@ public class DMaaPEventPublisher {
/**
* Reload Dmaap configuration
+ *
* @param dmaapConfiguration Dmaap configuration
*/
- public void reload(Map<String, PublisherConfig> dmaapConfiguration){
+ public void reload(Map<String, PublisherConfig> dmaapConfiguration) {
dMaaPConfig = dmaapConfiguration;
log.info("reload dmaap configuration");
}
@@ -58,22 +60,26 @@ public class DMaaPEventPublisher {
public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) {
clearVesUniqueIdFromEvent(vesEvents);
io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents);
- Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId));
+ HttpStatus rc = messageRouterPublishResponse(events, dmaapId);
+ return rc;
+ }
+
+ HttpStatus messageRouterPublishResponse(io.vavr.collection.List<String> events, String dmaapId) {
+ Flux<MessageRouterPublishResponse> messageRouterPublishFlux =
+ dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId));
MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst();
return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse));
+
}
private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) {
- return io.vavr.collection.List.ofAll(vesEvents)
- .map(event -> event.asJsonObject().toString());
+ return io.vavr.collection.List.ofAll(vesEvents).map(event -> event.asJsonObject().toString());
}
private void clearVesUniqueIdFromEvent(List<VesEvent> events) {
- events.stream()
- .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID))
- .forEach(event -> {
- log.debug("Removing VESuniqueid object from event");
- event.removeElement(VesEvent.VES_UNIQUE_ID);
- });
+ events.stream().filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID)).forEach(event -> {
+ log.debug("Removing VESuniqueid object from event");
+ event.removeElement(VesEvent.VES_UNIQUE_ID);
+ });
}
}