summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java23
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java10
2 files changed, 21 insertions, 12 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index aed99747..bf3987cb 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -23,21 +23,25 @@ package org.onap.dcaegen2.services.prh.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
public class DmaapConsumerJsonParser {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+
private static final String EVENT = "event";
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
private static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields";
@@ -59,7 +63,7 @@ public class DmaapConsumerJsonParser {
}
private Mono<JsonElement> getJsonParserMessage(String message) {
- return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
+ return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty")
: Mono.fromCallable(() -> new JsonParser().parse(message));
}
@@ -84,7 +88,7 @@ public class DmaapConsumerJsonParser {
private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP ->
- !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header")
: transform(monoJsonP))
.onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty());
}
@@ -101,8 +105,8 @@ public class DmaapConsumerJsonParser {
String pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS);
return (StringUtils.isEmpty(pnfSourceName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, pnfOamIpv6Address))
- ? Mono.error(new DmaapNotFoundException("Incorrect json, consumerDmaapModel can not be created: "
- + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address))) :
+ ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: "
+ + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address)) :
Mono.just(ImmutableConsumerDmaapModel.builder()
.correlationId(pnfSourceName)
.ipv4(pnfOamIpv4Address)
@@ -128,4 +132,9 @@ public class DmaapConsumerJsonParser {
+ "\"" + OAM_IPV_6_ADDRESS + "\": \"%s\""
+ "%n}", sourceName, oamIpv4Address, oamIpv6Address);
}
+
+ private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) {
+ LOGGER.warn(messageForLogger);
+ return Mono.empty();
+ }
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
index 689a732c..c17e2543 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
@@ -109,25 +109,25 @@ class DmaapConsumerTaskImplTest {
//given
prepareMocksForDmaapConsumer(Optional.empty());
- //then
- StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
- .expectError(DmaapEmptyResponseException.class).verify();
+ //when
+ Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
+ //then
verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+ assertEquals(null, response.blockFirst());
}
@Test
void whenPassedObjectFits_ReturnsCorrectResponse() {
//given
prepareMocksForDmaapConsumer(Optional.of(message));
+
//when
Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
//then
verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
assertEquals(consumerDmaapModel, response.blockFirst());
-
-
}
private void prepareMocksForDmaapConsumer(Optional<String> message) {