summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java80
1 files changed, 49 insertions, 31 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
index 6aaf1d6..9e0b87c 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,57 +21,74 @@
package org.onap.dcaegen2.services.pmmapper.messagerouter;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.List;
-
import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.logging.ref.slf4j.ONAPLogAdapter;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Flux;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class VESPublisher {
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));
- private RequestSender sender;
- private MapperConfig config;
+ private final DmaapRequestSender sender;
+ private final MapperConfig config;
public VESPublisher(MapperConfig config) {
- this(config, new RequestSender());
+ this(config, new DmaapRequestSender());
}
- public VESPublisher(MapperConfig config, RequestSender sender) {
+ public VESPublisher(MapperConfig config, DmaapRequestSender sender) {
this.sender = sender;
this.config = config;
}
public Flux<Event> publish(List<Event> events) {
logger.unwrap().info("Publishing VES events to messagerouter.");
- Event event = events.get(0);
- try {
- events.forEach(e -> this.publish(e.getVes()));
- logger.unwrap().info("Successfully published VES events to messagerouter.");
- } catch (MRPublisherException e) {
- logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", e);
- return Flux.empty();
- }
- return Flux.just(event);
+ Event first = events.get(0);
+ List<String> vesEvents = minifiedVesEvents(events);
+ return publishEvents(vesEvents)
+ .filter(DmaapResponse::failed)
+ .takeLast(1)
+ .flatMap(this::toFluxError)
+ .defaultIfEmpty(first)
+ .doOnComplete(() -> logger.unwrap().info("Successfully published VES events to messagerouter."))
+ .onErrorResume(this::resume);
+ }
+
+ private List<String> minifiedVesEvents(List<Event> events) {
+ return events.stream()
+ .map(Event::getVes)
+ .map(vesEvent -> vesEvent.replace("\n", ""))
+ .collect(Collectors.toList());
+ }
+
+ private Flux<MessageRouterPublishResponse> publishEvents(List<String> vesEvents) {
+ String topicUrl = config.getPublisherTopicUrl();
+ AafCredentials credentials = aafCredentials();
+ return sender.send(topicUrl, vesEvents, credentials);
+ }
+
+ private Flux<Event> toFluxError(MessageRouterPublishResponse response) {
+ return Flux.error(new MRPublisherException(response.failReason()));
+ }
+
+ private Flux<Event> resume(Throwable t) {
+ logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", t);
+ return Flux.empty();
}
- private void publish(String ves) {
- try {
- String topicUrl = config.getPublisherTopicUrl();
- ves = ves.replaceAll("\n", "");
- String userCredentials = Base64.getEncoder()
- .encodeToString((this.config.getPublisherUserName() + ":" +
- this.config.getPublisherPassword())
- .getBytes(StandardCharsets.UTF_8));
- sender.send("POST", topicUrl, ves, userCredentials);
- } catch (Exception e) {
- throw new MRPublisherException(e.getMessage(), e);
- }
+ private AafCredentials aafCredentials() {
+ return ImmutableAafCredentials.builder()
+ .username(config.getPublisherUserName())
+ .password(config.getPublisherPassword())
+ .build();
}
}